Derive MockRaftActorContext from RaftActorContextImpl
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.LinkedList;
17 import java.util.List;
18 import java.util.Queue;
19 import java.util.UUID;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
24 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
25 import org.opendaylight.controller.cluster.raft.messages.AddServer;
26 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
27 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
28 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
29 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.duration.FiniteDuration;
34
35 /**
36  * Handles server configuration related messages for a RaftActor.
37  *
38  * @author Thomas Pantelis
39  */
40 class RaftActorServerConfigurationSupport {
41     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
42
43     private final OperationState IDLE = new Idle();
44
45     private final RaftActorContext raftContext;
46
47     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
48
49     private OperationState currentOperationState = IDLE;
50
51     RaftActorServerConfigurationSupport(RaftActorContext context) {
52         this.raftContext = context;
53     }
54
55     boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
56         if(message instanceof AddServer) {
57             onAddServer((AddServer)message, raftActor, sender);
58             return true;
59         } else if (message instanceof FollowerCatchUpTimeout) {
60             currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout)message);
61             return true;
62         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
63             currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
64                     (UnInitializedFollowerSnapshotReply)message);
65             return true;
66         } else if(message instanceof ApplyState) {
67             return onApplyState((ApplyState) message, raftActor);
68         } else if(message instanceof SnapshotComplete) {
69             currentOperationState.onSnapshotComplete(raftActor);
70             return false;
71         } else {
72             return false;
73         }
74     }
75
76     private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
77         Payload data = applyState.getReplicatedLogEntry().getData();
78         if(data instanceof ServerConfigurationPayload) {
79             currentOperationState.onApplyState(raftActor, applyState);
80             return true;
81         }
82
83         return false;
84     }
85
86     /**
87      * The algorithm for AddServer is as follows:
88      * <ul>
89      * <li>Add the new server as a peer.</li>
90      * <li>Add the new follower to the leader.</li>
91      * <li>If new server should be voting member</li>
92      * <ul>
93      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
94      *     <li>Initiate install snapshot to the new follower.</li>
95      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
96      * </ul>
97      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
98      * <li>On replication consensus, respond to caller with OK.</li>
99      * </ul>
100      * If the install snapshot times out after a period of 2 * election time out
101      * <ul>
102      *     <li>Remove the new server as a peer.</li>
103      *     <li>Remove the new follower from the leader.</li>
104      *     <li>Respond to caller with TIMEOUT.</li>
105      * </ul>
106      */
107     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
108         LOG.debug("{}: onAddServer: {}", raftContext.getId(), addServer);
109
110         onNewOperation(raftActor, new AddServerContext(addServer, sender));
111     }
112
113     private void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
114         if (raftActor.isLeader()) {
115             currentOperationState.onNewOperation(raftActor, operationContext);
116         } else {
117             ActorSelection leader = raftActor.getLeader();
118             if (leader != null) {
119                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
120                 leader.forward(operationContext.getOperation(), raftActor.getContext());
121             } else {
122                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
123                 operationContext.getClientRequestor().tell(operationContext.newReply(
124                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
125             }
126         }
127     }
128
129     /**
130      * Interface for a server operation FSM state.
131      */
132     private interface OperationState {
133         void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
134
135         void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout);
136
137         void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
138
139         void onApplyState(RaftActor raftActor, ApplyState applyState);
140
141         void onSnapshotComplete(RaftActor raftActor);
142     }
143
144     /**
145      * Interface for the initial state for a server operation.
146      */
147     private interface InitialOperationState {
148         void initiate(RaftActor raftActor);
149     }
150
151     /**
152      * Abstract base class for server operation FSM state. Handles common behavior for all states.
153      */
154     private abstract class AbstractOperationState implements OperationState {
155         @Override
156         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
157             // We're currently processing another operation so queue it to be processed later.
158
159             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
160                     operationContext.getOperation());
161
162             pendingOperationsQueue.add(operationContext);
163         }
164
165         @Override
166         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
167             LOG.debug("onFollowerCatchupTimeout should not be called in state {}", this);
168         }
169
170         @Override
171         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
172             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
173         }
174
175         @Override
176         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
177             LOG.debug("onApplyState was called in state {}", this);
178         }
179
180         @Override
181         public void onSnapshotComplete(RaftActor raftActor) {
182         }
183
184         protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
185             Collection<PeerInfo> peers = raftContext.getPeers();
186             List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
187             for(PeerInfo peer: peers) {
188                 newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
189             }
190
191             newConfig.add(new ServerInfo(raftContext.getId(), true));
192
193             LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig);
194
195             ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig);
196
197             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
198
199             currentOperationState = new Persisting(operationContext);
200         }
201
202         protected void operationComplete(RaftActor raftActor, ServerOperationContext<?> operationContext,
203                 ServerChangeStatus status) {
204
205             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
206
207             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
208                     raftActor.self());
209
210             currentOperationState = IDLE;
211
212             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
213             if(nextOperation != null) {
214                 RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation);
215             }
216         }
217
218         @Override
219         public String toString() {
220             return getClass().getSimpleName();
221         }
222     }
223
224     /**
225      * The state when no server operation is in progress. It immediately initiates new server operations.
226      */
227     private class Idle extends AbstractOperationState {
228         @Override
229         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
230             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor);
231         }
232
233         @Override
234         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
235             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
236         }
237     }
238
239     /**
240      * The state when a new server configuration is being persisted and replicated.
241      */
242     private class Persisting extends AbstractOperationState {
243         private final ServerOperationContext<?> operationContext;
244
245         Persisting(ServerOperationContext<?> operationContext) {
246             this.operationContext = operationContext;
247         }
248
249         @Override
250         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
251             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
252             // sure it's meant for us.
253             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
254                 LOG.info("{}: {} has been successfully replicated to a majority of followers",
255                         applyState.getReplicatedLogEntry().getData());
256
257                 operationComplete(raftActor, operationContext, ServerChangeStatus.OK);
258             }
259         }
260     }
261
262     /**
263      * Abstract base class for an AddServer operation state.
264      */
265     private abstract class AddServerState extends AbstractOperationState {
266         private final AddServerContext addServerContext;
267
268         AddServerState(AddServerContext addServerContext) {
269             this.addServerContext = addServerContext;
270         }
271
272         AddServerContext getAddServerContext() {
273             return addServerContext;
274         }
275
276         Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
277             return raftContext.getActorSystem().scheduler().scheduleOnce(
278                     new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
279                             TimeUnit.MILLISECONDS), raftContext.getActor(),
280                             new FollowerCatchUpTimeout(addServerContext.getOperation().getNewServerId()),
281                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
282         }
283
284         void handleOnFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
285             String serverId = followerTimeout.getNewServerId();
286
287             LOG.debug("{}: onFollowerCatchupTimeout for new server {}", raftContext.getId(), serverId);
288
289             // cleanup
290             raftContext.removePeer(serverId);
291
292             boolean isLeader = raftActor.isLeader();
293             if(isLeader) {
294                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
295                 leader.removeFollower(serverId);
296             }
297
298             operationComplete(raftActor, getAddServerContext(),
299                     isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
300         }
301     }
302
303     /**
304      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
305      * snapshot capture, if necessary.
306      */
307     private class InitialAddServerState extends AddServerState implements InitialOperationState {
308         InitialAddServerState(AddServerContext addServerContext) {
309             super(addServerContext);
310         }
311
312         @Override
313         public void initiate(RaftActor raftActor) {
314             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
315
316             AddServer addServer = getAddServerContext().getOperation();
317
318             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
319
320             VotingState votingState = addServer.isVotingMember() ? VotingState.VOTING_NOT_INITIALIZED :
321                     VotingState.NON_VOTING;
322             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress(), votingState);
323
324             leader.addFollower(addServer.getNewServerId());
325
326             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
327                 // schedule the install snapshot timeout timer
328                 Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor);
329                 if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
330                     LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
331                             addServer.getNewServerId());
332
333                     currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
334                 } else {
335                     LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
336
337                     currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
338                             installSnapshotTimer);
339                 }
340             } else {
341                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
342                         raftContext.getId());
343
344                 persistNewServerConfiguration(raftActor, getAddServerContext());
345             }
346         }
347     }
348
349     /**
350      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
351      * reply or timeout.
352      */
353     private class InstallingSnapshot extends AddServerState {
354         private final Cancellable installSnapshotTimer;
355
356         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
357             super(addServerContext);
358             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
359         }
360
361         @Override
362         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
363             handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
364
365             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
366                     followerTimeout.getNewServerId());
367         }
368
369         @Override
370         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
371             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
372
373             String followerId = reply.getFollowerId();
374
375             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
376             // add server operation that timed out.
377             if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
378                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
379                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
380                 leader.updateMinReplicaCount();
381
382                 persistNewServerConfiguration(raftActor, getAddServerContext());
383
384                 installSnapshotTimer.cancel();
385             } else {
386                 LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
387                         raftContext.getId(), followerId,
388                         !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
389             }
390         }
391     }
392
393     /**
394      * The AddServer operation state for when there is a snapshot already in progress. When the current
395      * snapshot completes, it initiates an install snapshot.
396      */
397     private class WaitingForPriorSnapshotComplete extends AddServerState {
398         private final Cancellable snapshotTimer;
399
400         WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
401             super(addServerContext);
402             this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
403         }
404
405         @Override
406         public void onSnapshotComplete(RaftActor raftActor) {
407             LOG.debug("{}: onSnapshotComplete", raftContext.getId());
408
409             if(!raftActor.isLeader()) {
410                 LOG.debug("{}: No longer the leader", raftContext.getId());
411                 return;
412             }
413
414             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
415             if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
416                 LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
417                         getAddServerContext().getOperation().getNewServerId());
418
419                 currentOperationState = new InstallingSnapshot(getAddServerContext(),
420                         newInstallSnapshotTimer(raftActor));
421
422                 snapshotTimer.cancel();
423             }
424         }
425
426         @Override
427         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
428             handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
429
430             LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
431                     raftContext.getId(), followerTimeout.getNewServerId());
432         }
433     }
434
435     /**
436      * Stores context information for a server operation.
437      *
438      * @param <T> the operation type
439      */
440     private static abstract class ServerOperationContext<T> {
441         private final T operation;
442         private final ActorRef clientRequestor;
443         private final String contextId;
444
445         ServerOperationContext(T operation, ActorRef clientRequestor){
446             this.operation = operation;
447             this.clientRequestor = clientRequestor;
448             contextId = UUID.randomUUID().toString();
449         }
450
451         String getContextId() {
452             return contextId;
453         }
454
455         T getOperation() {
456             return operation;
457         }
458
459         ActorRef getClientRequestor() {
460             return clientRequestor;
461         }
462
463         abstract Object newReply(ServerChangeStatus status, String leaderId);
464
465         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
466     }
467
468     /**
469      * Stores context information for an AddServer operation.
470      */
471     private static class AddServerContext extends ServerOperationContext<AddServer> {
472         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
473             super(addServer, clientRequestor);
474         }
475
476         @Override
477         Object newReply(ServerChangeStatus status, String leaderId) {
478             return new AddServerReply(status, leaderId);
479         }
480
481         @Override
482         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
483             return support.new InitialAddServerState(this);
484         }
485     }
486 }