Implement RemoveServer for leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayDeque;
15 import java.util.Queue;
16 import java.util.UUID;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
19 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
20 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
21 import org.opendaylight.controller.cluster.raft.messages.AddServer;
22 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
23 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
24 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
26 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
27 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Handles server configuration related messages for a RaftActor.
34  *
35  * @author Thomas Pantelis
36  */
37 class RaftActorServerConfigurationSupport {
38     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
39
40     private final OperationState IDLE = new Idle();
41
42     private final RaftActor raftActor;
43
44     private final RaftActorContext raftContext;
45
46     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new ArrayDeque<>();
47
48     private OperationState currentOperationState = IDLE;
49
50     RaftActorServerConfigurationSupport(RaftActor raftActor) {
51         this.raftActor = raftActor;
52         this.raftContext = raftActor.getRaftActorContext();
53     }
54
55     boolean handleMessage(Object message, ActorRef sender) {
56         if(message instanceof AddServer) {
57             onAddServer((AddServer) message, sender);
58             return true;
59         } else if(message instanceof RemoveServer) {
60             onRemoveServer((RemoveServer) message, sender);
61             return true;
62         } else if (message instanceof ServerOperationTimeout) {
63             currentOperationState.onServerOperationTimeout((ServerOperationTimeout) message);
64             return true;
65         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
66             currentOperationState.onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply) message);
67             return true;
68         } else if(message instanceof ApplyState) {
69             return onApplyState((ApplyState) message);
70         } else if(message instanceof SnapshotComplete) {
71             currentOperationState.onSnapshotComplete();
72             return false;
73         } else {
74             return false;
75         }
76     }
77
78     private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
79         LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
80         boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
81         if(!isSelf && !raftContext.getPeerIds().contains(removeServer.getServerId())) {
82             sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()),
83                     raftActor.getSelf());
84         } else {
85             String serverAddress = isSelf ? raftActor.self().path().toString() :
86                 raftContext.getPeerAddress(removeServer.getServerId());
87             onNewOperation(new RemoveServerContext(removeServer, serverAddress, sender));
88         }
89     }
90
91     private boolean onApplyState(ApplyState applyState) {
92         Payload data = applyState.getReplicatedLogEntry().getData();
93         if(data instanceof ServerConfigurationPayload) {
94             currentOperationState.onApplyState(applyState);
95             return true;
96         }
97
98         return false;
99     }
100
101     /**
102      * The algorithm for AddServer is as follows:
103      * <ul>
104      * <li>Add the new server as a peer.</li>
105      * <li>Add the new follower to the leader.</li>
106      * <li>If new server should be voting member</li>
107      * <ul>
108      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
109      *     <li>Initiate install snapshot to the new follower.</li>
110      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
111      * </ul>
112      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
113      * <li>On replication consensus, respond to caller with OK.</li>
114      * </ul>
115      * If the install snapshot times out after a period of 2 * election time out
116      * <ul>
117      *     <li>Remove the new server as a peer.</li>
118      *     <li>Remove the new follower from the leader.</li>
119      *     <li>Respond to caller with TIMEOUT.</li>
120      * </ul>
121      */
122     private void onAddServer(AddServer addServer, ActorRef sender) {
123         LOG.debug("{}: onAddServer: {}, state: {}", raftContext.getId(), addServer, currentOperationState);
124
125         onNewOperation(new AddServerContext(addServer, sender));
126     }
127
128     private void onNewOperation(ServerOperationContext<?> operationContext) {
129         if (raftActor.isLeader()) {
130             currentOperationState.onNewOperation(operationContext);
131         } else {
132             ActorSelection leader = raftActor.getLeader();
133             if (leader != null) {
134                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
135                 leader.forward(operationContext.getOperation(), raftActor.getContext());
136             } else {
137                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
138                 operationContext.getClientRequestor().tell(operationContext.newReply(
139                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
140             }
141         }
142     }
143
144     /**
145      * Interface for a server operation FSM state.
146      */
147     private interface OperationState {
148         void onNewOperation(ServerOperationContext<?> operationContext);
149
150         void onServerOperationTimeout(ServerOperationTimeout timeout);
151
152         void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply);
153
154         void onApplyState(ApplyState applyState);
155
156         void onSnapshotComplete();
157     }
158
159     /**
160      * Interface for the initial state for a server operation.
161      */
162     private interface InitialOperationState {
163         void initiate();
164     }
165
166     /**
167      * Abstract base class for a server operation FSM state. Handles common behavior for all states.
168      */
169     private abstract class AbstractOperationState implements OperationState {
170         @Override
171         public void onNewOperation(ServerOperationContext<?> operationContext) {
172             // We're currently processing another operation so queue it to be processed later.
173
174             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
175                     operationContext.getOperation());
176
177             pendingOperationsQueue.add(operationContext);
178         }
179
180         @Override
181         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
182             LOG.debug("onServerOperationTimeout should not be called in state {}", this);
183         }
184
185         @Override
186         public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
187             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
188         }
189
190         @Override
191         public void onApplyState(ApplyState applyState) {
192             LOG.debug("onApplyState was called in state {}", this);
193         }
194
195         @Override
196         public void onSnapshotComplete() {
197         }
198
199         protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
200             raftContext.setDynamicServerConfigurationInUse();
201
202             boolean includeSelf = !operationContext.getServerId().equals(raftActor.getId());
203             ServerConfigurationPayload payload = raftContext.getPeerServerInfo(includeSelf);
204             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
205
206             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
207
208             currentOperationState = new Persisting(operationContext, newTimer(new ServerOperationTimeout(operationContext.getServerId())));
209
210             sendReply(operationContext, ServerChangeStatus.OK);
211         }
212
213         protected void operationComplete(ServerOperationContext<?> operationContext, @Nullable ServerChangeStatus replyStatus) {
214             if(replyStatus != null) {
215                 sendReply(operationContext, replyStatus);
216             }
217
218             operationContext.operationComplete(raftActor, replyStatus);
219
220             currentOperationState = IDLE;
221
222             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
223             if(nextOperation != null) {
224                 RaftActorServerConfigurationSupport.this.onNewOperation(nextOperation);
225             }
226         }
227
228         protected void sendReply(ServerOperationContext<?> operationContext, ServerChangeStatus status) {
229             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
230
231             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
232                     raftActor.self());
233         }
234
235         Cancellable newTimer(Object message) {
236             return raftContext.getActorSystem().scheduler().scheduleOnce(
237                     raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
238                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
239         }
240
241         @Override
242         public String toString() {
243             return getClass().getSimpleName();
244         }
245     }
246
247     /**
248      * The state when no server operation is in progress. It immediately initiates new server operations.
249      */
250     private class Idle extends AbstractOperationState {
251         @Override
252         public void onNewOperation(ServerOperationContext<?> operationContext) {
253             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate();
254         }
255
256         @Override
257         public void onApplyState(ApplyState applyState) {
258             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
259         }
260     }
261
262     /**
263      * The state when a new server configuration is being persisted and replicated.
264      */
265     private class Persisting extends AbstractOperationState {
266         private final ServerOperationContext<?> operationContext;
267         private final Cancellable timer;
268         private boolean timedOut = false;
269
270         Persisting(ServerOperationContext<?> operationContext, Cancellable timer) {
271             this.operationContext = operationContext;
272             this.timer = timer;
273         }
274
275         @Override
276         public void onApplyState(ApplyState applyState) {
277             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
278             // sure it's meant for us.
279             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
280                 LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
281                         applyState.getReplicatedLogEntry().getData());
282
283                 timer.cancel();
284                 operationComplete(operationContext, null);
285             }
286         }
287
288         @Override
289         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
290             LOG.warn("{}: Timeout occured while replicating the new server configuration for {}", raftContext.getId(),
291                     timeout.getServerId());
292
293             timedOut = true;
294
295             // Fail any pending operations
296             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
297             while(nextOperation != null) {
298                 sendReply(nextOperation, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
299                 nextOperation = pendingOperationsQueue.poll();
300             }
301         }
302
303         @Override
304         public void onNewOperation(ServerOperationContext<?> operationContext) {
305             if(timedOut) {
306                 sendReply(operationContext, ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT);
307             } else {
308                 super.onNewOperation(operationContext);
309             }
310         }
311     }
312
313     /**
314      * Abstract base class for an AddServer operation state.
315      */
316     private abstract class AddServerState extends AbstractOperationState {
317         private final AddServerContext addServerContext;
318
319         AddServerState(AddServerContext addServerContext) {
320             this.addServerContext = addServerContext;
321         }
322
323         AddServerContext getAddServerContext() {
324             return addServerContext;
325         }
326
327         Cancellable newInstallSnapshotTimer() {
328             return newTimer(new ServerOperationTimeout(addServerContext.getOperation().getNewServerId()));
329         }
330
331         void handleInstallSnapshotTimeout(ServerOperationTimeout timeout) {
332             String serverId = timeout.getServerId();
333
334             LOG.debug("{}: handleInstallSnapshotTimeout for new server {}", raftContext.getId(), serverId);
335
336             // cleanup
337             raftContext.removePeer(serverId);
338
339             boolean isLeader = raftActor.isLeader();
340             if(isLeader) {
341                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
342                 leader.removeFollower(serverId);
343             }
344
345             operationComplete(getAddServerContext(), isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
346         }
347
348     }
349
350     /**
351      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
352      * snapshot capture, if necessary.
353      */
354     private class InitialAddServerState extends AddServerState implements InitialOperationState {
355         InitialAddServerState(AddServerContext addServerContext) {
356             super(addServerContext);
357         }
358
359         @Override
360         public void initiate() {
361             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
362             AddServer addServer = getAddServerContext().getOperation();
363
364             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
365
366             if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
367                 operationComplete(getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
368                 return;
369             }
370
371             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
372                     VotingState.NON_VOTING;
373             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
374
375             leader.addFollower(addServer.getNewServerId());
376
377             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
378                 // schedule the install snapshot timeout timer
379                 Cancellable installSnapshotTimer = newInstallSnapshotTimer();
380                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
381                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
382                             addServer.getNewServerId());
383
384                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
385                 } else {
386                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
387
388                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
389                             installSnapshotTimer);
390                 }
391             } else {
392                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
393                         raftContext.getId());
394
395                 persistNewServerConfiguration(getAddServerContext());
396             }
397         }
398     }
399
400     /**
401      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
402      * reply or timeout.
403      */
404     private class InstallingSnapshot extends AddServerState {
405         private final Cancellable installSnapshotTimer;
406
407         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
408             super(addServerContext);
409             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
410         }
411
412         @Override
413         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
414             handleInstallSnapshotTimeout(timeout);
415
416             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
417                     timeout.getServerId());
418         }
419
420         @Override
421         public void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply) {
422             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
423
424             String followerId = reply.getFollowerId();
425
426             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
427             // add server operation that timed out.
428             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
429                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
430                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
431                 leader.updateMinReplicaCount();
432
433                 persistNewServerConfiguration(getAddServerContext());
434
435                 installSnapshotTimer.cancel();
436             } else {
437                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
438                         raftContext.getId(), followerId,
439                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
440             }
441         }
442     }
443
444     /**
445      * The AddServer operation state for when there is a snapshot already in progress. When the current
446      * snapshot completes, it initiates an install snapshot.
447      */
448     private class WaitingForPriorSnapshotComplete extends AddServerState {
449         private final Cancellable snapshotTimer;
450
451         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
452             super(addServerContext);
453             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
454         }
455
456         @Override
457         public void onSnapshotComplete() {
458             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
459
460             if(!raftActor.isLeader()) {
461                 LOG.debug("{}: No longer the leader", raftContext.getId());
462                 return;
463             }
464
465             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
466             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
467                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
468                         getAddServerContext().getOperation().getNewServerId());
469
470                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
471                         newInstallSnapshotTimer());
472
473                 snapshotTimer.cancel();
474             }
475         }
476
477         @Override
478         public void onServerOperationTimeout(ServerOperationTimeout timeout) {
479             handleInstallSnapshotTimeout(timeout);
480
481             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
482                     raftContext.getId(), timeout.getServerId());
483         }
484     }
485
486     /**
487      * Stores context information for a server operation.
488      *
489      * @param <T> the operation type
490      */
491     private static abstract class ServerOperationContext<T> {
492         private final T operation;
493         private final ActorRef clientRequestor;
494         private final String contextId;
495
496         ServerOperationContext(T operation, ActorRef clientRequestor){
497             this.operation = operation;
498             this.clientRequestor = clientRequestor;
499             contextId = UUID.randomUUID().toString();
500         }
501
502         String getContextId() {
503             return contextId;
504         }
505
506         T getOperation() {
507             return operation;
508         }
509
510         ActorRef getClientRequestor() {
511             return clientRequestor;
512         }
513
514         abstract Object newReply(ServerChangeStatus status, String leaderId);
515
516         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
517
518         abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
519
520         abstract String getServerId();
521     }
522
523     /**
524      * Stores context information for an AddServer operation.
525      */
526     private static class AddServerContext extends ServerOperationContext<AddServer> {
527         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
528             super(addServer, clientRequestor);
529         }
530
531         @Override
532         Object newReply(ServerChangeStatus status, String leaderId) {
533             return new AddServerReply(status, leaderId);
534         }
535
536         @Override
537         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
538             return support.new InitialAddServerState(this);
539         }
540
541         @Override
542         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
543
544         }
545
546         @Override
547         String getServerId() {
548             return getOperation().getNewServerId();
549         }
550     }
551
552     private abstract class RemoveServerState extends AbstractOperationState {
553         private final RemoveServerContext removeServerContext;
554
555
556         protected RemoveServerState(RemoveServerContext removeServerContext) {
557             this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
558
559         }
560
561         public RemoveServerContext getRemoveServerContext() {
562             return removeServerContext;
563         }
564     }
565
566     private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
567
568         protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
569             super(removeServerContext);
570         }
571
572         @Override
573         public void initiate() {
574             raftContext.removePeer(getRemoveServerContext().getOperation().getServerId());
575             persistNewServerConfiguration(getRemoveServerContext());
576         }
577     }
578
579     private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
580         private final String peerAddress;
581
582         RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
583             super(operation, clientRequestor);
584             this.peerAddress = peerAddress;
585         }
586
587         @Override
588         Object newReply(ServerChangeStatus status, String leaderId) {
589             return new RemoveServerReply(status, leaderId);
590         }
591
592         @Override
593         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
594             return support.new InitialRemoveServerState(this);
595         }
596
597         @Override
598         void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
599             if(peerAddress != null) {
600                 raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
601             }
602         }
603
604         @Override
605         String getServerId() {
606             return getOperation().getServerId();
607         }
608     }
609
610     static class ServerOperationTimeout {
611         private final String serverId;
612
613         ServerOperationTimeout(String serverId){
614            this.serverId = Preconditions.checkNotNull(serverId, "serverId should not be null");
615         }
616
617         String getServerId() {
618             return serverId;
619         }
620     }
621 }