Turn OperationState into an abstract class
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayDeque;
15 import java.util.Queue;
16 import java.util.UUID;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
20 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
21 import org.opendaylight.controller.cluster.raft.messages.AddServer;
22 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
23 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
24 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
26 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
27 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Handles server configuration related messages for a RaftActor.
34  *
35  * @author Thomas Pantelis
36  */
37 class RaftActorServerConfigurationSupport {
38     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
39
40     private final OperationState IDLE = new Idle();
41
42     private final RaftActor raftActor;
43
44     private final RaftActorContext raftContext;
45
46     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
47
48     private OperationState currentOperationState = IDLE;
49
50     RaftActorServerConfigurationSupport(RaftActor raftActor) {
51         this.raftActor = raftActor;
52         this.raftContext = raftActor.getRaftActorContext();
53     }
54
55     boolean handleMessage(Object message, ActorRef sender) {
56         if(message instanceof AddServer) {
57             onAddServer((AddServer) message, sender);
58             return true;
59         } else if(message instanceof RemoveServer) {
60             onRemoveServer((RemoveServer) message, sender);
61             return true;
62         } else if (message instanceof ServerOperationTimeout) {
63             currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
64             return true;
65         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
66             currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
67             return true;
68         } else if(message instanceof ApplyState) {
69             return onApplyState((ApplyState) message);
70         } else if(message instanceof SnapshotComplete) {
71             currentOperationState.onSnapshotComplete();
72             return false;
73         } else {
74             return false;
75         }
76     }
77
78     private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
79         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
80         boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
81         if(isSelf && !raftContext.hasFollowers()) {
82             sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
83                     raftActor.getSelf());
84         } else if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
85             sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
86                     raftActor.getSelf());
87         } else {
88             String serverAddress = isSelf ? raftActor.self().path().toString() :
89                 raftContext.getPeerAddress(removeServer.getServerId());
90             onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
91         }
92     }
93
94     private boolean onApplyState(ApplyState applyState) {
95         Payload data = applyState.getReplicatedLogEntry().getData();
96         if(data instanceof ServerConfigurationPayload) {
97             currentOperationState.onApplyState(applyState);
98             return true;
99         }
100
101         return false;
102     }
103
104     /**
105      * The algorithm for AddServer is as follows:
106      * <ul>
107      * <li>Add the new server as a peer.</li>
108      * <li>Add the new follower to the leader.</li>
109      * <li>If new server should be voting member</li>
110      * <ul>
111      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
112      *     <li>Initiate install snapshot to the new follower.</li>
113      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
114      * </ul>
115      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
116      * <li>On replication consensus, respond to caller with OK.</li>
117      * </ul>
118      * If the install snapshot times out after a period of 2 * election time out
119      * <ul>
120      *     <li>Remove the new server as a peer.</li>
121      *     <li>Remove the new follower from the leader.</li>
122      *     <li>Respond to caller with TIMEOUT.</li>
123      * </ul>
124      */
125     private void onAddServer(AddServer addServer, ActorRef sender) {
126         LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
127
128         onNewOperation(new AddServerContext(addServer, sender));
129     }
130
131     private void onNewOperation(ServerOperationContext<?> operationContext) {
132         if (raftActor.isLeader()) {
133             currentOperationState.onNewOperation(operationContext);
134         } else {
135             ActorSelection leader = raftActor.getLeader();
136             if (leader != null) {
137                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
138                 leader.forward(operationContext.getOperation(), raftActor.getContext());
139             } else {
140                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
141                 operationContext.getClientRequestor().tell(operationContext.newReply(
142                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
143             }
144         }
145     }
146
147     /**
148      * Interface for the initial state for a server operation.
149      */
150     private interface InitialOperationState {
151         void initiate();
152     }
153
154     /**
155      * Abstract base class for a server operation FSM state. Handles common behavior for all states.
156      */
157     private abstract class OperationState {
158         void onNewOperation(ServerOperationContext<?> operationContext) {
159             // We're currently processing another operation so queue it to be processed later.
160
161             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
162                     operationContext.getOperation());
163
164             pendingOperationsQueue.add(operationContext);
165         }
166
167         void onServerOperationTimeout(ServerOperationTimeout timeout) {
168             LOG.debug("onServerOperationTimeout should not be called in state {}", this);
169         }
170
171         void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
172             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
173         }
174
175         void onApplyState(ApplyState applyState) {
176             LOG.debug("onApplyState was called in state {}", this);
177         }
178
179         void onSnapshotComplete() {
180
181         }
182
183         protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
184             raftContext.setDynamicServerConfigurationInUse();
185
186             boolean includeSelf = !operationContext.getServerId().equals(raftActor.getId());
187             ServerConfigurationPayload payload = raftContext.getPeerServerInfo(includeSelf);
188             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
189
190             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
191
192             currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId())));
193
194             sendReply(operationContext, ServerChangeStatus.OK);
195         }
196
197         protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
198             if(replyStatus != null) {
199                 sendReply(operationContext, replyStatus);
200             }
201
202             operationContext.operationComplete(raftActor, replyStatus);
203
204             currentOperationState = IDLE;
205
206             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
207             if(nextOperation != null) {
208                 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
209             }
210         }
211
212         protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
213             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
214
215             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
216                     raftActor.self());
217         }
218
219         Cancellable newTimer(Object message) {
220             return raftContext.getActorSystem().scheduler().scheduleOnce(
221                     raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
222                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
223         }
224
225         @Override
226         public String toString() {
227             return getClass().getSimpleName();
228         }
229     }
230
231     /**
232      * The state when no server operation is in progress. It immediately initiates new server operations.
233      */
234     private final class Idle extends OperationState {
235         @Override
236         public void onNewOperation(ServerOperationContext<?> operationContext) {
237             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
238         }
239
240         @Override
241         public void onApplyState(ApplyState applyState) {
242             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
243         }
244     }
245
246     /**
247      * The state when a new server configuration is being persisted and replicated.
248      */
249     private final class Persisting extends OperationState {
250         private final ServerOperationContext<?> operationContext;
251         private final Cancellable timer;
252         private boolean timedOut = false;
253
254         Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
255             this.operationContext = operationContext;
256             this.timer = timer;
257         }
258
259         @Override
260         public void onApplyState(ApplyState applyState) {
261             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
262             // sure it's meant for us.
263             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
264                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
265                         applyState.getReplicatedLogEntry().getData());
266
267                 timer.cancel();
268                 operationComplete(operationContext, null);
269             }
270         }
271
272         @Override
273         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
274             LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
275                     timeout.getServerId());
276
277             timedOut = true;
278
279             // Fail any pending operations
280             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
281             while(nextOperation != null) {
282                 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
283                 nextOperation = pendingOperationsQueue.poll();
284             }
285         }
286
287         @Override
288         public void onNewOperation(ServerOperationContext<?> operationContext) {
289             if(timedOut) {
290                 sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
291             } else {
292                 super.onNewOperation(operationContext);
293             }
294         }
295     }
296
297     /**
298      * Abstract base class for an AddServer operation state.
299      */
300     private abstract class AddServerState extends OperationState {
301         private final AddServerContext addServerContext;
302
303         AddServerState(AddServerContext addServerContext) {
304             this.addServerContext = addServerContext;
305         }
306
307         AddServerContext getAddServerContext() {
308             return addServerContext;
309         }
310
311         Cancellable newInstallSnapshotTimer() {
312             return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
313         }
314
315         void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
316             String serverId = timeout.getServerId();
317
318             LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
319
320             // cleanup
321             raftContext.removePeer(serverId);
322
323             boolean isLeader = raftActor.isLeader();
324             if(isLeader) {
325                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
326                 leader.removeFollower(serverId);
327             }
328
329             operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
330         }
331
332     }
333
334     /**
335      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
336      * snapshot capture, if necessary.
337      */
338     private final class InitialAddServerState extends AddServerState implements InitialOperationState {
339         InitialAddServerState(AddServerContext addServerContext) {
340             super(addServerContext);
341         }
342
343         @Override
344         public void initiate() {
345             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
346             AddServer addServer = getAddServerContext().getOperation();
347
348             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
349
350             if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
351                 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
352                 return;
353             }
354
355             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
356                     VotingState.NON_VOTING;
357             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
358
359             leader.addFollower(addServer.getNewServerId());
360
361             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
362                 // schedule the install snapshot timeout timer
363                 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
364                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
365                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
366                             addServer.getNewServerId());
367
368                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
369                 } else {
370                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
371
372                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
373                             installSnapshotTimer);
374                 }
375             } else {
376                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
377                         raftContext.getId());
378
379                 persistNewServerConfiguration(getAddServerContext());
380             }
381         }
382     }
383
384     /**
385      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
386      * reply or timeout.
387      */
388     private final class InstallingSnapshot extends AddServerState {
389         private final Cancellable installSnapshotTimer;
390
391         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
392             super(addServerContext);
393             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
394         }
395
396         @Override
397         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
398             handleInstallSnapshotTimeout(timeout);
399
400             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
401                     timeout.getServerId());
402         }
403
404         @Override
405         public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
406             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
407
408             String followerId = reply.getFollowerId();
409
410             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
411             // add server operation that timed out.
412             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
413                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
414                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
415                 leader.updateMinReplicaCount();
416
417                 persistNewServerConfiguration(getAddServerContext());
418
419                 installSnapshotTimer.cancel();
420             } else {
421                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
422                         raftContext.getId(), followerId,
423                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
424             }
425         }
426     }
427
428     /**
429      * The AddServer operation state for when there is a snapshot already in progress. When the current
430      * snapshot completes, it initiates an install snapshot.
431      */
432     private final class WaitingForPriorSnapshotComplete extends AddServerState {
433         private final Cancellable snapshotTimer;
434
435         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
436             super(addServerContext);
437             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
438         }
439
440         @Override
441         public void onSnapshotComplete() {
442             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
443
444             if(!raftActor.isLeader()) {
445                 LOG.debug("{}: No longer the leader", raftContext.getId());
446                 return;
447             }
448
449             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
450             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
451                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
452                         getAddServerContext().getOperation().getNewServerId());
453
454                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
455                         newInstallSnapshotTimer());
456
457                 snapshotTimer.cancel();
458             }
459         }
460
461         @Override
462         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
463             handleInstallSnapshotTimeout(timeout);
464
465             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
466                     raftContext.getId(), timeout.getServerId());
467         }
468     }
469
470     /**
471      * Stores context information for a server operation.
472      *
473      * @param <T> the operation type
474      */
475     private static abstract class ServerOperationContext<T> {
476         private final T operation;
477         private final ActorRef clientRequestor;
478         private final String contextId;
479
480         ServerOperationContext(T operation, ActorRef clientRequestor){
481             this.operation = operation;
482             this.clientRequestor = clientRequestor;
483             contextId = UUID.randomUUID().toString();
484         }
485
486         String getContextId() {
487             return contextId;
488         }
489
490         T getOperation() {
491             return operation;
492         }
493
494         ActorRef getClientRequestor() {
495             return clientRequestor;
496         }
497
498         abstract Object newReply(ServerChangeStatus status, String leaderId);
499
500         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
501
502         abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
503
504         abstract String getServerId();
505     }
506
507     /**
508      * Stores context information for an AddServer operation.
509      */
510     private static class AddServerContext extends ServerOperationContext<AddServer> {
511         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
512             super(addServer, clientRequestor);
513         }
514
515         @Override
516         Object newReply(ServerChangeStatus status, String leaderId) {
517             return new AddServerReply(status, leaderId);
518         }
519
520         @Override
521         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
522             return support.new InitialAddServerState(this);
523         }
524
525         @Override
526         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
527
528         }
529
530         @Override
531         String getServerId() {
532             return getOperation().getNewServerId();
533         }
534     }
535
536     private abstract class RemoveServerState extends OperationState {
537         private final RemoveServerContext removeServerContext;
538
539         protected RemoveServerState(RemoveServerContext removeServerContext) {
540             this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
541
542         }
543
544         public RemoveServerContext getRemoveServerContext() {
545             return removeServerContext;
546         }
547     }
548
549     private final class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
550
551         protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
552             super(removeServerContext);
553         }
554
555         @Override
556         public void initiate() {
557             String serverId = getRemoveServerContext().getOperation().getServerId();
558             raftContext.removePeer(serverId);
559             ((AbstractLeader)raftActor.getCurrentBehavior()).removeFollower(serverId);
560
561             persistNewServerConfiguration(getRemoveServerContext());
562         }
563     }
564
565     private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
566         private final String peerAddress;
567
568         RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
569             super(operation, clientRequestor);
570             this.peerAddress = peerAddress;
571         }
572
573         @Override
574         Object newReply(ServerChangeStatus status, String leaderId) {
575             return new RemoveServerReply(status, leaderId);
576         }
577
578         @Override
579         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
580             return support.new InitialRemoveServerState(this);
581         }
582
583         @Override
584         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
585             if(peerAddress != null) {
586                 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
587             }
588         }
589
590         @Override
591         String getServerId() {
592             return getOperation().getServerId();
593         }
594     }
595
596     static class ServerOperationTimeout {
597         private final String serverId;
598
599         ServerOperationTimeout(String serverId){
600            this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null");
601         }
602
603         String getServerId() {
604             return serverId;
605         }
606     }
607 }