Bug 2187: Timeout the Persist state
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.UUID;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
20 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
21 import org.opendaylight.controller.cluster.raft.messages.AddServer;
22 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
23 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
24 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
26 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
27 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Handles server configuration related messages for a RaftActor.
34  *
35  * @author Thomas Pantelis
36  */
37 class RaftActorServerConfigurationSupport {
38     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
39
40     private final OperationState IDLE = new Idle();
41
42     private final RaftActorContext raftContext;
43
44     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
45
46     private OperationState currentOperationState = IDLE;
47
48     RaftActorServerConfigurationSupport(RaftActorContext context) {
49         this.raftContext = context;
50     }
51
52     boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
53         if(message instanceof AddServer) {
54             onAddServer((AddServer) message, raftActor, sender);
55             return true;
56         } else if(message instanceof RemoveServer) {
57             onRemoveServer((RemoveServer) message, raftActor, sender);
58             return true;
59         } else if (message instanceof ServerOperationTimeout) {
60             currentOperationState.onServerOperationTimeout(raftActor, (ServerOperationTimeout) message);
61             return true;
62         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
63             currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
64                     (UnInitializedFollowerSnapshotReply) message);
65             return true;
66         } else if(message instanceof ApplyState) {
67             return onApplyState((ApplyState) message, raftActor);
68         } else if(message instanceof SnapshotComplete) {
69             currentOperationState.onSnapshotComplete(raftActor);
70             return false;
71         } else {
72             return false;
73         }
74     }
75
76     private void onRemoveServer(RemoveServer removeServer, RaftActor raftActor, ActorRef sender) {
77         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
78         if(removeServer.getServerId().equals(raftActor.getLeaderId())){
79             // Removing current leader is not supported yet
80             // TODO: To properly support current leader removal we need to first implement transfer of leadership
81             LOG.debug("Cannot remove {} replica because it is the Leader", removeServer.getServerId());
82             sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf());
83         } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) {
84             sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf());
85         } else {
86             onNewOperation(raftActor, new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender));
87         }
88     }
89
90     private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
91         Payload data = applyState.getReplicatedLogEntry().getData();
92         if(data instanceof ServerConfigurationPayload) {
93             currentOperationState.onApplyState(raftActor, applyState);
94             return true;
95         }
96
97         return false;
98     }
99
100     /**
101      * The algorithm for AddServer is as follows:
102      * <ul>
103      * <li>Add the new server as a peer.</li>
104      * <li>Add the new follower to the leader.</li>
105      * <li>If new server should be voting member</li>
106      * <ul>
107      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
108      *     <li>Initiate install snapshot to the new follower.</li>
109      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
110      * </ul>
111      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
112      * <li>On replication consensus, respond to caller with OK.</li>
113      * </ul>
114      * If the install snapshot times out after a period of 2 * election time out
115      * <ul>
116      *     <li>Remove the new server as a peer.</li>
117      *     <li>Remove the new follower from the leader.</li>
118      *     <li>Respond to caller with TIMEOUT.</li>
119      * </ul>
120      */
121     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
122         LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
123
124         onNewOperation(raftActor, new AddServerContext(addServer, sender));
125     }
126
127     private void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
128         if (raftActor.isLeader()) {
129             currentOperationState.onNewOperation(raftActor, operationContext);
130         } else {
131             ActorSelection leader = raftActor.getLeader();
132             if (leader != null) {
133                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
134                 leader.forward(operationContext.getOperation(), raftActor.getContext());
135             } else {
136                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
137                 operationContext.getClientRequestor().tell(operationContext.newReply(
138                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
139             }
140         }
141     }
142
143     /**
144      * Interface for a server operation FSM state.
145      */
146     private interface OperationState {
147         void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
148
149         void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout);
150
151         void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
152
153         void onApplyState(RaftActor raftActor, ApplyState applyState);
154
155         void onSnapshotComplete(RaftActor raftActor);
156     }
157
158     /**
159      * Interface for the initial state for a server operation.
160      */
161     private interface InitialOperationState {
162         void initiate(RaftActor raftActor);
163     }
164
165     /**
166      * Abstract base class for a server operation FSM state. Handles common behavior for all states.
167      */
168     private abstract class AbstractOperationState implements OperationState {
169         @Override
170         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
171             // We're currently processing another operation so queue it to be processed later.
172
173             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
174                     operationContext.getOperation());
175
176             pendingOperationsQueue.add(operationContext);
177         }
178
179         @Override
180         public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
181             LOG.debug("onServerOperationTimeout should not be called in state {}", this);
182         }
183
184         @Override
185         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
186             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
187         }
188
189         @Override
190         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
191             LOG.debug("onApplyState was called in state {}", this);
192         }
193
194         @Override
195         public void onSnapshotComplete(RaftActor raftActor) {
196         }
197
198         protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
199             raftContext.setDynamicServerConfigurationInUse();
200             ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
201             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
202
203             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
204
205             currentOperationState = new Persisting(operationContext, newTimer(
206                     new ServerOperationTimeout(operationContext.getServerId())));
207
208             sendReply(raftActor, operationContext, ServerChangeStatus.OK);
209         }
210
211         protected void operationComplete(RaftActor raftActor, ServerOperationContext<?> operationContext,
212                 @Nullable ServerChangeStatus replyStatus) {
213             if(replyStatus != null) {
214                 sendReply(raftActor, operationContext, replyStatus);
215             }
216
217             operationContext.operationComplete(raftActor, replyStatus);
218
219             currentOperationState = IDLE;
220
221             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
222             if(nextOperation != null) {
223                 RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation);
224             }
225         }
226
227         protected void sendReply(RaftActor raftActor, ServerOperationContext<?> operationContext,
228                 ServerChangeStatus status) {
229             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
230
231             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
232                     raftActor.self());
233         }
234
235         Cancellable newTimer(Object message) {
236             return raftContext.getActorSystem().scheduler().scheduleOnce(
237                     raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(),
238                             message, raftContext.getActorSystem().dispatcher(), raftContext.getActor());
239         }
240
241         @Override
242         public String toString() {
243             return getClass().getSimpleName();
244         }
245     }
246
247     /**
248      * The state when no server operation is in progress. It immediately initiates new server operations.
249      */
250     private class Idle extends AbstractOperationState {
251         @Override
252         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
253             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor);
254         }
255
256         @Override
257         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
258             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
259         }
260     }
261
262     /**
263      * The state when a new server configuration is being persisted and replicated.
264      */
265     private class Persisting extends AbstractOperationState {
266         private final ServerOperationContext<?> operationContext;
267         private final Cancellable timer;
268         private boolean timedOut = false;
269
270         Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
271             this.operationContext = operationContext;
272             this.timer = timer;
273         }
274
275         @Override
276         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
277             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
278             // sure it's meant for us.
279             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
280                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
281                         applyState.getReplicatedLogEntry().getData());
282
283                 timer.cancel();
284                 operationComplete(raftActor, operationContext, null);
285             }
286         }
287
288         @Override
289         public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
290             LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
291                     timeout.getServerId());
292
293             timedOut = true;
294
295             // Fail any pending operations
296             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
297             while(nextOperation != null) {
298                 sendReply(raftActor, nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
299                 nextOperation = pendingOperationsQueue.poll();
300             }
301         }
302
303         @Override
304         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
305             if(timedOut) {
306                 sendReply(raftActor, operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
307             } else {
308                 super.onNewOperation(raftActor, operationContext);
309             }
310         }
311     }
312
313     /**
314      * Abstract base class for an AddServer operation state.
315      */
316     private abstract class AddServerState extends AbstractOperationState {
317         private final AddServerContext addServerContext;
318
319         AddServerState(AddServerContext addServerContext) {
320             this.addServerContext = addServerContext;
321         }
322
323         AddServerContext getAddServerContext() {
324             return addServerContext;
325         }
326
327         Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
328             return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
329         }
330
331         void handleInstallSnapshotTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
332             String serverId = timeout.getServerId();
333
334             LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
335
336             // cleanup
337             raftContext.removePeer(serverId);
338
339             boolean isLeader = raftActor.isLeader();
340             if(isLeader) {
341                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
342                 leader.removeFollower(serverId);
343             }
344
345             operationComplete(raftActor, getAddServerContext(),
346                     isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
347         }
348
349     }
350
351     /**
352      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
353      * snapshot capture, if necessary.
354      */
355     private class InitialAddServerState extends AddServerState implements InitialOperationState {
356         InitialAddServerState(AddServerContext addServerContext) {
357             super(addServerContext);
358         }
359
360         @Override
361         public void initiate(RaftActor raftActor) {
362             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
363             AddServer addServer = getAddServerContext().getOperation();
364
365             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
366
367             if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
368                 operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
369                 return;
370             }
371
372             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
373                     VotingState.NON_VOTING;
374             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
375
376             leader.addFollower(addServer.getNewServerId());
377
378             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
379                 // schedule the install snapshot timeout timer
380                 Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor);
381                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
382                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
383                             addServer.getNewServerId());
384
385                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
386                 } else {
387                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
388
389                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
390                             installSnapshotTimer);
391                 }
392             } else {
393                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
394                         raftContext.getId());
395
396                 persistNewServerConfiguration(raftActor, getAddServerContext());
397             }
398         }
399     }
400
401     /**
402      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
403      * reply or timeout.
404      */
405     private class InstallingSnapshot extends AddServerState {
406         private final Cancellable installSnapshotTimer;
407
408         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
409             super(addServerContext);
410             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
411         }
412
413         @Override
414         public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
415             handleInstallSnapshotTimeout(raftActor, timeout);
416
417             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
418                     timeout.getServerId());
419         }
420
421         @Override
422         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
423             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
424
425             String followerId = reply.getFollowerId();
426
427             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
428             // add server operation that timed out.
429             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
430                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
431                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
432                 leader.updateMinReplicaCount();
433
434                 persistNewServerConfiguration(raftActor, getAddServerContext());
435
436                 installSnapshotTimer.cancel();
437             } else {
438                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
439                         raftContext.getId(), followerId,
440                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
441             }
442         }
443     }
444
445     /**
446      * The AddServer operation state for when there is a snapshot already in progress. When the current
447      * snapshot completes, it initiates an install snapshot.
448      */
449     private class WaitingForPriorSnapshotComplete extends AddServerState {
450         private final Cancellable snapshotTimer;
451
452         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
453             super(addServerContext);
454             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
455         }
456
457         @Override
458         public void onSnapshotComplete(RaftActor raftActor) {
459             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
460
461             if(!raftActor.isLeader()) {
462                 LOG.debug("{}: No longer the leader", raftContext.getId());
463                 return;
464             }
465
466             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
467             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
468                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
469                         getAddServerContext().getOperation().getNewServerId());
470
471                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
472                         newInstallSnapshotTimer(raftActor));
473
474                 snapshotTimer.cancel();
475             }
476         }
477
478         @Override
479         public void onServerOperationTimeout(RaftActor raftActor, ServerOperationTimeout timeout) {
480             handleInstallSnapshotTimeout(raftActor, timeout);
481
482             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
483                     raftContext.getId(), timeout.getServerId());
484         }
485     }
486
487     /**
488      * Stores context information for a server operation.
489      *
490      * @param <T> the operation type
491      */
492     private static abstract class ServerOperationContext<T> {
493         private final T operation;
494         private final ActorRef clientRequestor;
495         private final String contextId;
496
497         ServerOperationContext(T operation, ActorRef clientRequestor){
498             this.operation = operation;
499             this.clientRequestor = clientRequestor;
500             contextId = UUID.randomUUID().toString();
501         }
502
503         String getContextId() {
504             return contextId;
505         }
506
507         T getOperation() {
508             return operation;
509         }
510
511         ActorRef getClientRequestor() {
512             return clientRequestor;
513         }
514
515         abstract Object newReply(ServerChangeStatus status, String leaderId);
516
517         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
518
519         abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
520
521         abstract String getServerId();
522     }
523
524     /**
525      * Stores context information for an AddServer operation.
526      */
527     private static class AddServerContext extends ServerOperationContext<AddServer> {
528         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
529             super(addServer, clientRequestor);
530         }
531
532         @Override
533         Object newReply(ServerChangeStatus status, String leaderId) {
534             return new AddServerReply(status, leaderId);
535         }
536
537         @Override
538         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
539             return support.new InitialAddServerState(this);
540         }
541
542         @Override
543         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
544
545         }
546
547         @Override
548         String getServerId() {
549             return getOperation().getNewServerId();
550         }
551     }
552
553     private abstract class RemoveServerState extends AbstractOperationState {
554         private final RemoveServerContext removeServerContext;
555
556
557         protected RemoveServerState(RemoveServerContext removeServerContext) {
558             this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
559
560         }
561
562         public RemoveServerContext getRemoveServerContext() {
563             return removeServerContext;
564         }
565     }
566
567     private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
568
569         protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
570             super(removeServerContext);
571         }
572
573         @Override
574         public void initiate(RaftActor raftActor) {
575             raftContext.removePeer(getRemoveServerContext().getOperation().getServerId());
576             persistNewServerConfiguration(raftActor, getRemoveServerContext());
577         }
578     }
579
580     private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
581         private final String peerAddress;
582
583         RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
584             super(operation, clientRequestor);
585             this.peerAddress = peerAddress;
586         }
587
588         @Override
589         Object newReply(ServerChangeStatus status, String leaderId) {
590             return new RemoveServerReply(status, leaderId);
591         }
592
593         @Override
594         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
595             return support.new InitialRemoveServerState(this);
596         }
597
598         @Override
599         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
600             if(peerAddress != null) {
601                 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
602             }
603         }
604
605         @Override
606         String getServerId() {
607             return getOperation().getServerId();
608         }
609     }
610
611     static class ServerOperationTimeout {
612         private final String serverId;
613
614         ServerOperationTimeout(String serverId){
615            this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null");
616         }
617
618         String getServerId() {
619             return serverId;
620         }
621     }
622 }