Bug 2187: Persisting Actor peerIds' in snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.UUID;
17 import java.util.concurrent.TimeUnit;
18 import javax.annotation.Nullable;
19 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
20 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
21 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
22 import org.opendaylight.controller.cluster.raft.messages.AddServer;
23 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
24 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
25 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
26 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
27 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.duration.FiniteDuration;
31
32 /**
33  * Handles server configuration related messages for a RaftActor.
34  *
35  * @author Thomas Pantelis
36  */
37 class RaftActorServerConfigurationSupport {
38     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
39
40     private final OperationState IDLE = new Idle();
41
42     private final RaftActorContext raftContext;
43
44     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
45
46     private OperationState currentOperationState = IDLE;
47
48     RaftActorServerConfigurationSupport(RaftActorContext context) {
49         this.raftContext = context;
50     }
51
52     boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
53         if(message instanceof AddServer) {
54             onAddServer((AddServer)message, raftActor, sender);
55             return true;
56         } else if (message instanceof FollowerCatchUpTimeout) {
57             currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout)message);
58             return true;
59         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
60             currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
61                     (UnInitializedFollowerSnapshotReply)message);
62             return true;
63         } else if(message instanceof ApplyState) {
64             return onApplyState((ApplyState) message, raftActor);
65         } else if(message instanceof SnapshotComplete) {
66             currentOperationState.onSnapshotComplete(raftActor);
67             return false;
68         } else {
69             return false;
70         }
71     }
72
73     private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
74         Payload data = applyState.getReplicatedLogEntry().getData();
75         if(data instanceof ServerConfigurationPayload) {
76             currentOperationState.onApplyState(raftActor, applyState);
77             return true;
78         }
79
80         return false;
81     }
82
83     /**
84      * The algorithm for AddServer is as follows:
85      * <ul>
86      * <li>Add the new server as a peer.</li>
87      * <li>Add the new follower to the leader.</li>
88      * <li>If new server should be voting member</li>
89      * <ul>
90      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
91      *     <li>Initiate install snapshot to the new follower.</li>
92      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
93      * </ul>
94      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
95      * <li>On replication consensus, respond to caller with OK.</li>
96      * </ul>
97      * If the install snapshot times out after a period of 2 * election time out
98      * <ul>
99      *     <li>Remove the new server as a peer.</li>
100      *     <li>Remove the new follower from the leader.</li>
101      *     <li>Respond to caller with TIMEOUT.</li>
102      * </ul>
103      */
104     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
105         LOG.debug("{}: onAddServer: {}", raftContext.getId(), addServer);
106
107         onNewOperation(raftActor, new AddServerContext(addServer, sender));
108     }
109
110     private void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
111         if (raftActor.isLeader()) {
112             currentOperationState.onNewOperation(raftActor, operationContext);
113         } else {
114             ActorSelection leader = raftActor.getLeader();
115             if (leader != null) {
116                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
117                 leader.forward(operationContext.getOperation(), raftActor.getContext());
118             } else {
119                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
120                 operationContext.getClientRequestor().tell(operationContext.newReply(
121                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
122             }
123         }
124     }
125
126     /**
127      * Interface for a server operation FSM state.
128      */
129     private interface OperationState {
130         void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
131
132         void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout);
133
134         void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
135
136         void onApplyState(RaftActor raftActor, ApplyState applyState);
137
138         void onSnapshotComplete(RaftActor raftActor);
139     }
140
141     /**
142      * Interface for the initial state for a server operation.
143      */
144     private interface InitialOperationState {
145         void initiate(RaftActor raftActor);
146     }
147
148     /**
149      * Abstract base class for server operation FSM state. Handles common behavior for all states.
150      */
151     private abstract class AbstractOperationState implements OperationState {
152         @Override
153         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
154             // We're currently processing another operation so queue it to be processed later.
155
156             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
157                     operationContext.getOperation());
158
159             pendingOperationsQueue.add(operationContext);
160         }
161
162         @Override
163         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
164             LOG.debug("onFollowerCatchupTimeout should not be called in state {}", this);
165         }
166
167         @Override
168         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
169             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
170         }
171
172         @Override
173         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
174             LOG.debug("onApplyState was called in state {}", this);
175         }
176
177         @Override
178         public void onSnapshotComplete(RaftActor raftActor) {
179         }
180
181         protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
182             raftContext.setDynamicServerConfigurationInUse();
183             ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
184             LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
185
186             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
187
188             currentOperationState = new Persisting(operationContext);
189
190             sendReply(raftActor, operationContext, ServerChangeStatus.OK);
191         }
192
193         protected void operationComplete(RaftActor raftActor, ServerOperationContext<?> operationContext,
194                 @Nullable ServerChangeStatus replyStatus) {
195             if(replyStatus != null) {
196                 sendReply(raftActor, operationContext, replyStatus);
197             }
198
199             currentOperationState = IDLE;
200
201             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
202             if(nextOperation != null) {
203                 RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation);
204             }
205         }
206
207         private void sendReply(RaftActor raftActor, ServerOperationContext<?> operationContext,
208                 ServerChangeStatus status) {
209             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
210
211             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
212                     raftActor.self());
213         }
214
215         @Override
216         public String toString() {
217             return getClass().getSimpleName();
218         }
219     }
220
221     /**
222      * The state when no server operation is in progress. It immediately initiates new server operations.
223      */
224     private class Idle extends AbstractOperationState {
225         @Override
226         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
227             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor);
228         }
229
230         @Override
231         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
232             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
233         }
234     }
235
236     /**
237      * The state when a new server configuration is being persisted and replicated.
238      */
239     private class Persisting extends AbstractOperationState {
240         private final ServerOperationContext<?> operationContext;
241
242         Persisting(ServerOperationContext<?> operationContext) {
243             this.operationContext = operationContext;
244         }
245
246         @Override
247         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
248             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
249             // sure it's meant for us.
250             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
251                 LOG.info("{}: {} has been successfully replicated to a majority of followers",
252                         applyState.getReplicatedLogEntry().getData());
253
254                 operationComplete(raftActor, operationContext, null);
255             }
256         }
257     }
258
259     /**
260      * Abstract base class for an AddServer operation state.
261      */
262     private abstract class AddServerState extends AbstractOperationState {
263         private final AddServerContext addServerContext;
264
265         AddServerState(AddServerContext addServerContext) {
266             this.addServerContext = addServerContext;
267         }
268
269         AddServerContext getAddServerContext() {
270             return addServerContext;
271         }
272
273         Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
274             return raftContext.getActorSystem().scheduler().scheduleOnce(
275                     new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
276                             TimeUnit.MILLISECONDS), raftContext.getActor(),
277                             new FollowerCatchUpTimeout(addServerContext.getOperation().getNewServerId()),
278                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
279         }
280
281         void handleOnFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
282             String serverId = followerTimeout.getNewServerId();
283
284             LOG.debug("{}: onFollowerCatchupTimeout for new server {}", raftContext.getId(), serverId);
285
286             // cleanup
287             raftContext.removePeer(serverId);
288
289             boolean isLeader = raftActor.isLeader();
290             if(isLeader) {
291                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
292                 leader.removeFollower(serverId);
293             }
294
295             operationComplete(raftActor, getAddServerContext(),
296                     isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
297         }
298     }
299
300     /**
301      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
302      * snapshot capture, if necessary.
303      */
304     private class InitialAddServerState extends AddServerState implements InitialOperationState {
305         InitialAddServerState(AddServerContext addServerContext) {
306             super(addServerContext);
307         }
308
309         @Override
310         public void initiate(RaftActor raftActor) {
311             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
312             AddServer addServer = getAddServerContext().getOperation();
313
314             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
315
316             if(raftContext.getPeerInfo(addServer.getNewServerId()) != null) {
317                 operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.ALREADY_EXISTS);
318                 return;
319             }
320
321             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
322                     VotingState.NON_VOTING;
323             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
324
325             leader.addFollower(addServer.getNewServerId());
326
327             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
328                 // schedule the install snapshot timeout timer
329                 Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor);
330                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
331                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
332                             addServer.getNewServerId());
333
334                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
335                 } else {
336                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
337
338                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
339                             installSnapshotTimer);
340                 }
341             } else {
342                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
343                         raftContext.getId());
344
345                 persistNewServerConfiguration(raftActor, getAddServerContext());
346             }
347         }
348     }
349
350     /**
351      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
352      * reply or timeout.
353      */
354     private class InstallingSnapshot extends AddServerState {
355         private final Cancellable installSnapshotTimer;
356
357         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
358             super(addServerContext);
359             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
360         }
361
362         @Override
363         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
364             handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
365
366             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
367                     followerTimeout.getNewServerId());
368         }
369
370         @Override
371         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
372             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
373
374             String followerId = reply.getFollowerId();
375
376             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
377             // add server operation that timed out.
378             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
379                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
380                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
381                 leader.updateMinReplicaCount();
382
383                 persistNewServerConfiguration(raftActor, getAddServerContext());
384
385                 installSnapshotTimer.cancel();
386             } else {
387                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
388                         raftContext.getId(), followerId,
389                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
390             }
391         }
392     }
393
394     /**
395      * The AddServer operation state for when there is a snapshot already in progress. When the current
396      * snapshot completes, it initiates an install snapshot.
397      */
398     private class WaitingForPriorSnapshotComplete extends AddServerState {
399         private final Cancellable snapshotTimer;
400
401         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
402             super(addServerContext);
403             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
404         }
405
406         @Override
407         public void onSnapshotComplete(RaftActor raftActor) {
408             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
409
410             if(!raftActor.isLeader()) {
411                 LOG.debug("{}: No longer the leader", raftContext.getId());
412                 return;
413             }
414
415             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
416             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
417                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
418                         getAddServerContext().getOperation().getNewServerId());
419
420                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
421                         newInstallSnapshotTimer(raftActor));
422
423                 snapshotTimer.cancel();
424             }
425         }
426
427         @Override
428         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
429             handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
430
431             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
432                     raftContext.getId(), followerTimeout.getNewServerId());
433         }
434     }
435
436     /**
437      * Stores context information for a server operation.
438      *
439      * @param <T> the operation type
440      */
441     private static abstract class ServerOperationContext<T> {
442         private final T operation;
443         private final ActorRef clientRequestor;
444         private final String contextId;
445
446         ServerOperationContext(T operation, ActorRef clientRequestor){
447             this.operation = operation;
448             this.clientRequestor = clientRequestor;
449             contextId = UUID.randomUUID().toString();
450         }
451
452         String getContextId() {
453             return contextId;
454         }
455
456         T getOperation() {
457             return operation;
458         }
459
460         ActorRef getClientRequestor() {
461             return clientRequestor;
462         }
463
464         abstract Object newReply(ServerChangeStatus status, String leaderId);
465
466         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
467     }
468
469     /**
470      * Stores context information for an AddServer operation.
471      */
472     private static class AddServerContext extends ServerOperationContext<AddServer> {
473         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
474             super(addServer, clientRequestor);
475         }
476
477         @Override
478         Object newReply(ServerChangeStatus status, String leaderId) {
479             return new AddServerReply(status, leaderId);
480         }
481
482         @Override
483         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
484             return support.new InitialAddServerState(this);
485         }
486     }
487 }