Use ArrayDeque instead of LinkedList
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayDeque;
15 import java.util.Queue;
16 import java.util.UUID;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
20 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
21 import org.opendaylight.controller.cluster.raft.messages.AddServer;
22 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
23 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
24 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
26 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
27 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Handles server configuration related messages for a RaftActor.
34  *
35  * @author Thomas Pantelis
36  */
37 class RaftActorServerConfigurationSupport {
38     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
39
40     private final OperationState IDLE = new Idle();
41
42     private final RaftActor raftActor;
43
44     private final RaftActorContext raftContext;
45
46     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
47
48     private OperationState currentOperationState = IDLE;
49
50     RaftActorServerConfigurationSupport(RaftActor raftActor) {
51         this.raftActor = raftActor;
52         this.raftContext = raftActor.getRaftActorContext();
53     }
54
55     boolean handleMessage(Object message, ActorRef sender) {
56         if(message instanceof AddServer) {
57             onAddServer((AddServer) message, sender);
58             return true;
59         } else if(message instanceof RemoveServer) {
60             onRemoveServer((RemoveServer) message, sender);
61             return true;
62         } else if (message instanceof ServerOperationTimeout) {
63             currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
64             return true;
65         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
66             currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
67             return true;
68         } else if(message instanceof ApplyState) {
69             return onApplyState((ApplyState) message);
70         } else if(message instanceof SnapshotComplete) {
71             currentOperationState.onSnapshotComplete();
72             return false;
73         } else {
74             return false;
75         }
76     }
77
78     private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
79         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
80         if(removeServer.getServerId().equals(raftActor.getLeaderId())){
81             // Removing current leader is not supported yet
82             // TODO: To properly support current leader removal we need to first implement transfer of leadership
83             LOG.debug("Cannot remove {} replica because it is the Leader", removeServer.getServerId());
84             sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf());
85         } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) {
86             sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf());
87         } else {
88             onNewOperation(new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender));
89         }
90     }
91
92     private boolean onApplyState(ApplyState applyState) {
93         Payload data = applyState.getReplicatedLogEntry().getData();
94         if(data instanceof ServerConfigurationPayload) {
95             currentOperationState.onApplyState(applyState);
96             return true;
97         }
98
99         return false;
100     }
101
102     /**
103      * The algorithm for AddServer is as follows:
104      * <ul>
105      * <li>Add the new server as a peer.</li>
106      * <li>Add the new follower to the leader.</li>
107      * <li>If new server should be voting member</li>
108      * <ul>
109      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
110      *     <li>Initiate install snapshot to the new follower.</li>
111      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
112      * </ul>
113      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
114      * <li>On replication consensus, respond to caller with OK.</li>
115      * </ul>
116      * If the install snapshot times out after a period of 2 * election time out
117      * <ul>
118      *     <li>Remove the new server as a peer.</li>
119      *     <li>Remove the new follower from the leader.</li>
120      *     <li>Respond to caller with TIMEOUT.</li>
121      * </ul>
122      */
123     private void onAddServer(AddServer addServer, ActorRef sender) {
124         LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
125
126         onNewOperation(new AddServerContext(addServer, sender));
127     }
128
129     private void onNewOperation(ServerOperationContext<?> operationContext) {
130         if (raftActor.isLeader()) {
131             currentOperationState.onNewOperation(operationContext);
132         } else {
133             ActorSelection leader = raftActor.getLeader();
134             if (leader != null) {
135                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
136                 leader.forward(operationContext.getOperation(), raftActor.getContext());
137             } else {
138                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
139                 operationContext.getClientRequestor().tell(operationContext.newReply(
140                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
141             }
142         }
143     }
144
145     /**
146      * Interface for a server operation FSM state.
147      */
148     private interface OperationState {
149         void onNewOperation(ServerOperationContext<?> operationContext);
150
151         void onServerOperationTimeout(ServerOperationTimeout timeout);
152
153         void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply);
154
155         void onApplyState(ApplyState applyState);
156
157         void onSnapshotComplete();
158     }
159
160     /**
161      * Interface for the initial state for a server operation.
162      */
163     private interface InitialOperationState {
164         void initiate();
165     }
166
167     /**
168      * Abstract base class for a server operation FSM state. Handles common behavior for all states.
169      */
170     private abstract class AbstractOperationState implements OperationState {
171         @Override
172         public void onNewOperation(ServerOperationContext<?> operationContext) {
173             // We're currently processing another operation so queue it to be processed later.
174
175             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
176                     operationContext.getOperation());
177
178             pendingOperationsQueue.add(operationContext);
179         }
180
181         @Override
182         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
183             LOG.debug("onServerOperationTimeout should not be called in state {}", this);
184         }
185
186         @Override
187         public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
188             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
189         }
190
191         @Override
192         public void onApplyState(ApplyState applyState) {
193             LOG.debug("onApplyState was called in state {}", this);
194         }
195
196         @Override
197         public void onSnapshotComplete() {
198         }
199
200         protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
201             raftContext.setDynamicServerConfigurationInUse();
202             ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
203             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
204
205             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
206
207             currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId())));
208
209             sendReply(operationContext, ServerChangeStatus.OK);
210         }
211
212         protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
213             if(replyStatus != null) {
214                 sendReply(operationContext, replyStatus);
215             }
216
217             operationContext.operationComplete(raftActor, replyStatus);
218
219             currentOperationState = IDLE;
220
221             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
222             if(nextOperation != null) {
223                 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
224             }
225         }
226
227         protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
228             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
229
230             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
231                     raftActor.self());
232         }
233
234         Cancellable newTimer(Object message) {
235             return raftContext.getActorSystem().scheduler().scheduleOnce(
236                     raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
237                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
238         }
239
240         @Override
241         public String toString() {
242             return getClass().getSimpleName();
243         }
244     }
245
246     /**
247      * The state when no server operation is in progress. It immediately initiates new server operations.
248      */
249     private class Idle extends AbstractOperationState {
250         @Override
251         public void onNewOperation(ServerOperationContext<?> operationContext) {
252             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
253         }
254
255         @Override
256         public void onApplyState(ApplyState applyState) {
257             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
258         }
259     }
260
261     /**
262      * The state when a new server configuration is being persisted and replicated.
263      */
264     private class Persisting extends AbstractOperationState {
265         private final ServerOperationContext<?> operationContext;
266         private final Cancellable timer;
267         private boolean timedOut = false;
268
269         Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
270             this.operationContext = operationContext;
271             this.timer = timer;
272         }
273
274         @Override
275         public void onApplyState(ApplyState applyState) {
276             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
277             // sure it's meant for us.
278             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
279                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
280                         applyState.getReplicatedLogEntry().getData());
281
282                 timer.cancel();
283                 operationComplete(operationContext, null);
284             }
285         }
286
287         @Override
288         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
289             LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
290                     timeout.getServerId());
291
292             timedOut = true;
293
294             // Fail any pending operations
295             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
296             while(nextOperation != null) {
297                 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
298                 nextOperation = pendingOperationsQueue.poll();
299             }
300         }
301
302         @Override
303         public void onNewOperation(ServerOperationContext<?> operationContext) {
304             if(timedOut) {
305                 sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
306             } else {
307                 super.onNewOperation(operationContext);
308             }
309         }
310     }
311
312     /**
313      * Abstract base class for an AddServer operation state.
314      */
315     private abstract class AddServerState extends AbstractOperationState {
316         private final AddServerContext addServerContext;
317
318         AddServerState(AddServerContext addServerContext) {
319             this.addServerContext = addServerContext;
320         }
321
322         AddServerContext getAddServerContext() {
323             return addServerContext;
324         }
325
326         Cancellable newInstallSnapshotTimer() {
327             return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
328         }
329
330         void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
331             String serverId = timeout.getServerId();
332
333             LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
334
335             // cleanup
336             raftContext.removePeer(serverId);
337
338             boolean isLeader = raftActor.isLeader();
339             if(isLeader) {
340                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
341                 leader.removeFollower(serverId);
342             }
343
344             operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
345         }
346
347     }
348
349     /**
350      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
351      * snapshot capture, if necessary.
352      */
353     private class InitialAddServerState extends AddServerState implements InitialOperationState {
354         InitialAddServerState(AddServerContext addServerContext) {
355             super(addServerContext);
356         }
357
358         @Override
359         public void initiate() {
360             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
361             AddServer addServer = getAddServerContext().getOperation();
362
363             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
364
365             if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
366                 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
367                 return;
368             }
369
370             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
371                     VotingState.NON_VOTING;
372             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
373
374             leader.addFollower(addServer.getNewServerId());
375
376             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
377                 // schedule the install snapshot timeout timer
378                 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
379                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
380                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
381                             addServer.getNewServerId());
382
383                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
384                 } else {
385                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
386
387                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
388                             installSnapshotTimer);
389                 }
390             } else {
391                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
392                         raftContext.getId());
393
394                 persistNewServerConfiguration(getAddServerContext());
395             }
396         }
397     }
398
399     /**
400      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
401      * reply or timeout.
402      */
403     private class InstallingSnapshot extends AddServerState {
404         private final Cancellable installSnapshotTimer;
405
406         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
407             super(addServerContext);
408             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
409         }
410
411         @Override
412         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
413             handleInstallSnapshotTimeout(timeout);
414
415             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
416                     timeout.getServerId());
417         }
418
419         @Override
420         public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
421             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
422
423             String followerId = reply.getFollowerId();
424
425             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
426             // add server operation that timed out.
427             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
428                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
429                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
430                 leader.updateMinReplicaCount();
431
432                 persistNewServerConfiguration(getAddServerContext());
433
434                 installSnapshotTimer.cancel();
435             } else {
436                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
437                         raftContext.getId(), followerId,
438                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
439             }
440         }
441     }
442
443     /**
444      * The AddServer operation state for when there is a snapshot already in progress. When the current
445      * snapshot completes, it initiates an install snapshot.
446      */
447     private class WaitingForPriorSnapshotComplete extends AddServerState {
448         private final Cancellable snapshotTimer;
449
450         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
451             super(addServerContext);
452             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
453         }
454
455         @Override
456         public void onSnapshotComplete() {
457             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
458
459             if(!raftActor.isLeader()) {
460                 LOG.debug("{}: No longer the leader", raftContext.getId());
461                 return;
462             }
463
464             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
465             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
466                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
467                         getAddServerContext().getOperation().getNewServerId());
468
469                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
470                         newInstallSnapshotTimer());
471
472                 snapshotTimer.cancel();
473             }
474         }
475
476         @Override
477         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
478             handleInstallSnapshotTimeout(timeout);
479
480             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
481                     raftContext.getId(), timeout.getServerId());
482         }
483     }
484
485     /**
486      * Stores context information for a server operation.
487      *
488      * @param <T> the operation type
489      */
490     private static abstract class ServerOperationContext<T> {
491         private final T operation;
492         private final ActorRef clientRequestor;
493         private final String contextId;
494
495         ServerOperationContext(T operation, ActorRef clientRequestor){
496             this.operation = operation;
497             this.clientRequestor = clientRequestor;
498             contextId = UUID.randomUUID().toString();
499         }
500
501         String getContextId() {
502             return contextId;
503         }
504
505         T getOperation() {
506             return operation;
507         }
508
509         ActorRef getClientRequestor() {
510             return clientRequestor;
511         }
512
513         abstract Object newReply(ServerChangeStatus status, String leaderId);
514
515         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
516
517         abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
518
519         abstract String getServerId();
520     }
521
522     /**
523      * Stores context information for an AddServer operation.
524      */
525     private static class AddServerContext extends ServerOperationContext<AddServer> {
526         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
527             super(addServer, clientRequestor);
528         }
529
530         @Override
531         Object newReply(ServerChangeStatus status, String leaderId) {
532             return new AddServerReply(status, leaderId);
533         }
534
535         @Override
536         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
537             return support.new InitialAddServerState(this);
538         }
539
540         @Override
541         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
542
543         }
544
545         @Override
546         String getServerId() {
547             return getOperation().getNewServerId();
548         }
549     }
550
551     private abstract class RemoveServerState extends AbstractOperationState {
552         private final RemoveServerContext removeServerContext;
553
554
555         protected RemoveServerState(RemoveServerContext removeServerContext) {
556             this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
557
558         }
559
560         public RemoveServerContext getRemoveServerContext() {
561             return removeServerContext;
562         }
563     }
564
565     private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
566
567         protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
568             super(removeServerContext);
569         }
570
571         @Override
572         public void initiate() {
573             raftContext.removePeer(getRemoveServerContext().getOperation().getServerId());
574             persistNewServerConfiguration(getRemoveServerContext());
575         }
576     }
577
578     private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
579         private final String peerAddress;
580
581         RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
582             super(operation, clientRequestor);
583             this.peerAddress = peerAddress;
584         }
585
586         @Override
587         Object newReply(ServerChangeStatus status, String leaderId) {
588             return new RemoveServerReply(status, leaderId);
589         }
590
591         @Override
592         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
593             return support.new InitialRemoveServerState(this);
594         }
595
596         @Override
597         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
598             if(peerAddress != null) {
599                 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
600             }
601         }
602
603         @Override
604         String getServerId() {
605             return getOperation().getServerId();
606         }
607     }
608
609     static class ServerOperationTimeout {
610         private final String serverId;
611
612         ServerOperationTimeout(String serverId){
613            this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null");
614         }
615
616         String getServerId() {
617             return serverId;
618         }
619     }
620 }