207642a7213e637fc8af6539ea9e989c0a75d43e
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.UUID;
17 import java.util.concurrent.TimeUnit;
18 import javax.annotation.Nullable;
19 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
20 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
21 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
22 import org.opendaylight.controller.cluster.raft.messages.AddServer;
23 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
24 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
25 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
26 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
27 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
28 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
29 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.duration.FiniteDuration;
34
35 /**
36  * Handles server configuration related messages for a RaftActor.
37  *
38  * @author Thomas Pantelis
39  */
40 class RaftActorServerConfigurationSupport {
41     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
42
43     private final OperationState IDLE = new Idle();
44
45     private final RaftActorContext raftContext;
46
47     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
48
49     private OperationState currentOperationState = IDLE;
50
51     RaftActorServerConfigurationSupport(RaftActorContext context) {
52         this.raftContext = context;
53     }
54
55     boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
56         if(message instanceof AddServer) {
57             onAddServer((AddServer) message, raftActor, sender);
58             return true;
59         } else if(message instanceof RemoveServer) {
60             onRemoveServer((RemoveServer) message, raftActor, sender);
61             return true;
62         } else if (message instanceof FollowerCatchUpTimeout) {
63             currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout) message);
64             return true;
65         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
66             currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
67                     (UnInitializedFollowerSnapshotReply) message);
68             return true;
69         } else if(message instanceof ApplyState) {
70             return onApplyState((ApplyState) message, raftActor);
71         } else if(message instanceof SnapshotComplete) {
72             currentOperationState.onSnapshotComplete(raftActor);
73             return false;
74         } else {
75             return false;
76         }
77     }
78
79     private void onRemoveServer(RemoveServer removeServer, RaftActor raftActor, ActorRef sender) {
80         if(removeServer.getServerId().equals(raftActor.getLeaderId())){
81             // Removing current leader is not supported yet
82             // TODO: To properly support current leader removal we need to first implement transfer of leadership
83             LOG.debug("Cannot remove {} replica because it is the Leader", removeServer.getServerId());
84             sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf());
85         } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) {
86             sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf());
87         } else {
88             onNewOperation(raftActor, new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender));
89         }
90     }
91
92     private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
93         Payload data = applyState.getReplicatedLogEntry().getData();
94         if(data instanceof ServerConfigurationPayload) {
95             currentOperationState.onApplyState(raftActor, applyState);
96             return true;
97         }
98
99         return false;
100     }
101
102     /**
103      * The algorithm for AddServer is as follows:
104      * <ul>
105      * <li>Add the new server as a peer.</li>
106      * <li>Add the new follower to the leader.</li>
107      * <li>If new server should be voting member</li>
108      * <ul>
109      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
110      *     <li>Initiate install snapshot to the new follower.</li>
111      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
112      * </ul>
113      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
114      * <li>On replication consensus, respond to caller with OK.</li>
115      * </ul>
116      * If the install snapshot times out after a period of 2 * election time out
117      * <ul>
118      *     <li>Remove the new server as a peer.</li>
119      *     <li>Remove the new follower from the leader.</li>
120      *     <li>Respond to caller with TIMEOUT.</li>
121      * </ul>
122      */
123     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
124         LOG.debug("{}: onAddServer: {}", raftContext.getId(), addServer);
125
126         onNewOperation(raftActor, new AddServerContext(addServer, sender));
127     }
128
129     private void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
130         if (raftActor.isLeader()) {
131             currentOperationState.onNewOperation(raftActor, operationContext);
132         } else {
133             ActorSelection leader = raftActor.getLeader();
134             if (leader != null) {
135                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
136                 leader.forward(operationContext.getOperation(), raftActor.getContext());
137             } else {
138                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
139                 operationContext.getClientRequestor().tell(operationContext.newReply(
140                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
141             }
142         }
143     }
144
145     /**
146      * Interface for a server operation FSM state.
147      */
148     private interface OperationState {
149         void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
150
151         void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout);
152
153         void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
154
155         void onApplyState(RaftActor raftActor, ApplyState applyState);
156
157         void onSnapshotComplete(RaftActor raftActor);
158     }
159
160     /**
161      * Interface for the initial state for a server operation.
162      */
163     private interface InitialOperationState {
164         void initiate(RaftActor raftActor);
165     }
166
167     /**
168      * Abstract base class for server operation FSM state. Handles common behavior for all states.
169      */
170     private abstract class AbstractOperationState implements OperationState {
171         @Override
172         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
173             // We're currently processing another operation so queue it to be processed later.
174
175             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
176                     operationContext.getOperation());
177
178             pendingOperationsQueue.add(operationContext);
179         }
180
181         @Override
182         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
183             LOG.debug("onFollowerCatchupTimeout should not be called in state {}", this);
184         }
185
186         @Override
187         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
188             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
189         }
190
191         @Override
192         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
193             LOG.debug("onApplyState was called in state {}", this);
194         }
195
196         @Override
197         public void onSnapshotComplete(RaftActor raftActor) {
198         }
199
200         protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
201             raftContext.setDynamicServerConfigurationInUse();
202             ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
203             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
204
205             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
206
207             currentOperationState = new Persisting(operationContext);
208
209             sendReply(raftActor, operationContext, ServerChangeStatus.OK);
210         }
211
212         protected void operationComplete(RaftActor raftActor, ServerOperationContext<?> operationContext,
213                 @Nullable ServerChangeStatus replyStatus) {
214             if(replyStatus != null) {
215                 sendReply(raftActor, operationContext, replyStatus);
216             }
217
218             operationContext.operationComplete(raftActor, replyStatus);
219
220             currentOperationState = IDLE;
221
222             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
223             if(nextOperation != null) {
224                 RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation);
225             }
226         }
227
228         private void sendReply(RaftActor raftActor, ServerOperationContext<?> operationContext,
229                 ServerChangeStatus status) {
230             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
231
232             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
233                     raftActor.self());
234         }
235
236         @Override
237         public String toString() {
238             return getClass().getSimpleName();
239         }
240     }
241
242     /**
243      * The state when no server operation is in progress. It immediately initiates new server operations.
244      */
245     private class Idle extends AbstractOperationState {
246         @Override
247         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
248             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor);
249         }
250
251         @Override
252         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
253             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
254         }
255     }
256
257     /**
258      * The state when a new server configuration is being persisted and replicated.
259      */
260     private class Persisting extends AbstractOperationState {
261         private final ServerOperationContext<?> operationContext;
262
263         Persisting(ServerOperationContext<?> operationContext) {
264             this.operationContext = operationContext;
265         }
266
267         @Override
268         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
269             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
270             // sure it's meant for us.
271             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
272                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
273                         applyState.getReplicatedLogEntry().getData());
274
275                 operationComplete(raftActor, operationContext, null);
276             }
277         }
278     }
279
280     /**
281      * Abstract base class for an AddServer operation state.
282      */
283     private abstract class AddServerState extends AbstractOperationState {
284         private final AddServerContext addServerContext;
285
286         AddServerState(AddServerContext addServerContext) {
287             this.addServerContext = addServerContext;
288         }
289
290         AddServerContext getAddServerContext() {
291             return addServerContext;
292         }
293
294         Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
295             return raftContext.getActorSystem().scheduler().scheduleOnce(
296                     new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
297                             TimeUnit.MILLISECONDS), raftContext.getActor(),
298                             new FollowerCatchUpTimeout(addServerContext.getOperation().getNewServerId()),
299                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
300         }
301
302         void handleOnFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
303             String serverId = followerTimeout.getNewServerId();
304
305             LOG.debug("{}: onFollowerCatchupTimeout for new server {}", raftContext.getId(), serverId);
306
307             // cleanup
308             raftContext.removePeer(serverId);
309
310             boolean isLeader = raftActor.isLeader();
311             if(isLeader) {
312                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
313                 leader.removeFollower(serverId);
314             }
315
316             operationComplete(raftActor, getAddServerContext(),
317                     isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
318         }
319
320     }
321
322     /**
323      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
324      * snapshot capture, if necessary.
325      */
326     private class InitialAddServerState extends AddServerState implements InitialOperationState {
327         InitialAddServerState(AddServerContext addServerContext) {
328             super(addServerContext);
329         }
330
331         @Override
332         public void initiate(RaftActor raftActor) {
333             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
334             AddServer addServer = getAddServerContext().getOperation();
335
336             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
337
338             if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
339                 operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
340                 return;
341             }
342
343             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
344                     VotingState.NON_VOTING;
345             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
346
347             leader.addFollower(addServer.getNewServerId());
348
349             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
350                 // schedule the install snapshot timeout timer
351                 Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor);
352                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
353                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
354                             addServer.getNewServerId());
355
356                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
357                 } else {
358                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
359
360                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
361                             installSnapshotTimer);
362                 }
363             } else {
364                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
365                         raftContext.getId());
366
367                 persistNewServerConfiguration(raftActor, getAddServerContext());
368             }
369         }
370     }
371
372     /**
373      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
374      * reply or timeout.
375      */
376     private class InstallingSnapshot extends AddServerState {
377         private final Cancellable installSnapshotTimer;
378
379         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
380             super(addServerContext);
381             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
382         }
383
384         @Override
385         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
386             handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
387
388             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
389                     followerTimeout.getNewServerId());
390         }
391
392         @Override
393         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
394             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
395
396             String followerId = reply.getFollowerId();
397
398             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
399             // add server operation that timed out.
400             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
401                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
402                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
403                 leader.updateMinReplicaCount();
404
405                 persistNewServerConfiguration(raftActor, getAddServerContext());
406
407                 installSnapshotTimer.cancel();
408             } else {
409                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
410                         raftContext.getId(), followerId,
411                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
412             }
413         }
414     }
415
416     /**
417      * The AddServer operation state for when there is a snapshot already in progress. When the current
418      * snapshot completes, it initiates an install snapshot.
419      */
420     private class WaitingForPriorSnapshotComplete extends AddServerState {
421         private final Cancellable snapshotTimer;
422
423         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
424             super(addServerContext);
425             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
426         }
427
428         @Override
429         public void onSnapshotComplete(RaftActor raftActor) {
430             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
431
432             if(!raftActor.isLeader()) {
433                 LOG.debug("{}: No longer the leader", raftContext.getId());
434                 return;
435             }
436
437             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
438             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
439                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
440                         getAddServerContext().getOperation().getNewServerId());
441
442                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
443                         newInstallSnapshotTimer(raftActor));
444
445                 snapshotTimer.cancel();
446             }
447         }
448
449         @Override
450         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
451             handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
452
453             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
454                     raftContext.getId(), followerTimeout.getNewServerId());
455         }
456     }
457
458     /**
459      * Stores context information for a server operation.
460      *
461      * @param <T> the operation type
462      */
463     private static abstract class ServerOperationContext<T> {
464         private final T operation;
465         private final ActorRef clientRequestor;
466         private final String contextId;
467
468         ServerOperationContext(T operation, ActorRef clientRequestor){
469             this.operation = operation;
470             this.clientRequestor = clientRequestor;
471             contextId = UUID.randomUUID().toString();
472         }
473
474         String getContextId() {
475             return contextId;
476         }
477
478         T getOperation() {
479             return operation;
480         }
481
482         ActorRef getClientRequestor() {
483             return clientRequestor;
484         }
485
486         abstract Object newReply(ServerChangeStatus status, String leaderId);
487
488         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
489
490         abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
491     }
492
493     /**
494      * Stores context information for an AddServer operation.
495      */
496     private static class AddServerContext extends ServerOperationContext<AddServer> {
497         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
498             super(addServer, clientRequestor);
499         }
500
501         @Override
502         Object newReply(ServerChangeStatus status, String leaderId) {
503             return new AddServerReply(status, leaderId);
504         }
505
506         @Override
507         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
508             return support.new InitialAddServerState(this);
509         }
510
511         @Override
512         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
513
514         }
515     }
516
517     private abstract class RemoveServerState extends AbstractOperationState {
518         private final RemoveServerContext removeServerContext;
519
520
521         protected RemoveServerState(RemoveServerContext removeServerContext) {
522             this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
523
524         }
525
526         public RemoveServerContext getRemoveServerContext() {
527             return removeServerContext;
528         }
529
530     }
531
532     private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
533
534         protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
535             super(removeServerContext);
536         }
537
538         @Override
539         public void initiate(RaftActor raftActor) {
540             raftContext.removePeer(getRemoveServerContext().getOperation().getServerId());
541             persistNewServerConfiguration(raftActor, getRemoveServerContext());
542         }
543     }
544
545     private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
546         private final String peerAddress;
547
548         RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
549             super(operation, clientRequestor);
550             this.peerAddress = peerAddress;
551         }
552
553         @Override
554         Object newReply(ServerChangeStatus status, String leaderId) {
555             return new RemoveServerReply(status, leaderId);
556         }
557
558         @Override
559         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
560             return support.new InitialRemoveServerState(this);
561         }
562
563         @Override
564         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
565             if(peerAddress != null) {
566                 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
567             }
568         }
569
570     }
571 }