Add getPeerIds to RaftActorContext
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.base.Preconditions;
14 import java.util.ArrayList;
15 import java.util.Collections;
16 import java.util.LinkedList;
17 import java.util.List;
18 import java.util.Queue;
19 import java.util.UUID;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
22 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
23 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
24 import org.opendaylight.controller.cluster.raft.messages.AddServer;
25 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
26 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
27 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
28 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import scala.concurrent.duration.FiniteDuration;
33
34 /**
35  * Handles server configuration related messages for a RaftActor.
36  *
37  * @author Thomas Pantelis
38  */
39 class RaftActorServerConfigurationSupport {
40     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
41
42     private final OperationState IDLE = new Idle();
43
44     private final RaftActorContext raftContext;
45
46     private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
47
48     private OperationState currentOperationState = IDLE;
49
50     RaftActorServerConfigurationSupport(RaftActorContext context) {
51         this.raftContext = context;
52     }
53
54     boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
55         if(message instanceof AddServer) {
56             onAddServer((AddServer)message, raftActor, sender);
57             return true;
58         } else if (message instanceof FollowerCatchUpTimeout) {
59             currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout)message);
60             return true;
61         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
62             currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
63                     (UnInitializedFollowerSnapshotReply)message);
64             return true;
65         } else if(message instanceof ApplyState) {
66             return onApplyState((ApplyState) message, raftActor);
67         } else {
68             return false;
69         }
70     }
71
72     private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
73         Payload data = applyState.getReplicatedLogEntry().getData();
74         if(data instanceof ServerConfigurationPayload) {
75             currentOperationState.onApplyState(raftActor, applyState);
76             return true;
77         }
78
79         return false;
80     }
81
82     /**
83      * The algorithm for AddServer is as follows:
84      * <ul>
85      * <li>Add the new server as a peer.</li>
86      * <li>Add the new follower to the leader.</li>
87      * <li>If new server should be voting member</li>
88      * <ul>
89      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
90      *     <li>Initiate install snapshot to the new follower.</li>
91      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
92      * </ul>
93      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
94      * <li>On replication consensus, respond to caller with OK.</li>
95      * </ul>
96      * If the install snapshot times out after a period of 2 * election time out
97      * <ul>
98      *     <li>Remove the new server as a peer.</li>
99      *     <li>Remove the new follower from the leader.</li>
100      *     <li>Respond to caller with TIMEOUT.</li>
101      * </ul>
102      */
103     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
104         LOG.debug("{}: onAddServer: {}", raftContext.getId(), addServer);
105
106         onNewOperation(raftActor, new AddServerContext(addServer, sender));
107     }
108
109     private void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
110         if (raftActor.isLeader()) {
111             currentOperationState.onNewOperation(raftActor, operationContext);
112         } else {
113             ActorSelection leader = raftActor.getLeader();
114             if (leader != null) {
115                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
116                 leader.forward(operationContext.getOperation(), raftActor.getContext());
117             } else {
118                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
119                 operationContext.getClientRequestor().tell(operationContext.newReply(
120                         ServerChangeStatus.NO_LEADER, null), raftActor.self());
121             }
122         }
123     }
124
125     /**
126      * Interface for a server operation FSM state.
127      */
128     private interface OperationState {
129         void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
130
131         void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout);
132
133         void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
134
135         void onApplyState(RaftActor raftActor, ApplyState applyState);
136     }
137
138     /**
139      * Interface for the initial state for a server operation.
140      */
141     private interface InitialOperationState {
142         void initiate(RaftActor raftActor);
143     }
144
145     /**
146      * Abstract base class for server operation FSM state. Handles common behavior for all states.
147      */
148     private abstract class AbstractOperationState implements OperationState {
149         @Override
150         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
151             // We're currently processing another operation so queue it to be processed later.
152
153             LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
154                     operationContext.getOperation());
155
156             pendingOperationsQueue.add(operationContext);
157         }
158
159         @Override
160         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
161             LOG.debug("onFollowerCatchupTimeout should not be called in state {}", this);
162         }
163
164         @Override
165         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
166             LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
167         }
168
169         @Override
170         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
171             LOG.debug("onApplyState was called in state {}", this);
172         }
173
174         protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
175             List <String> newConfig = new ArrayList<String>(raftContext.getPeerIds());
176             newConfig.add(raftContext.getId());
177
178             LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig);
179
180             ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig, Collections.<String>emptyList());
181
182             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
183
184             currentOperationState = new Persisting(operationContext);
185         }
186
187         protected void operationComplete(RaftActor raftActor, ServerOperationContext<?> operationContext,
188                 ServerChangeStatus status) {
189
190             LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
191
192             operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
193                     raftActor.self());
194
195             currentOperationState = IDLE;
196
197             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
198             if(nextOperation != null) {
199                 RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation);
200             }
201         }
202
203         @Override
204         public String toString() {
205             return getClass().getSimpleName();
206         }
207     }
208
209     /**
210      * The state when no server operation is in progress. It immediately initiates new server operations.
211      */
212     private class Idle extends AbstractOperationState {
213         @Override
214         public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
215             operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor);
216         }
217
218         @Override
219         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
220             // Noop - we override b/c ApplyState is called normally for followers in the idle state.
221         }
222     }
223
224     /**
225      * The state when a new server configuration is being persisted and replicated.
226      */
227     private class Persisting extends AbstractOperationState {
228         private final ServerOperationContext<?> operationContext;
229
230         Persisting(ServerOperationContext<?> operationContext) {
231             this.operationContext = operationContext;
232         }
233
234         @Override
235         public void onApplyState(RaftActor raftActor, ApplyState applyState) {
236             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
237             // sure it's meant for us.
238             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
239                 LOG.info("{}: {} has been successfully replicated to a majority of followers",
240                         applyState.getReplicatedLogEntry().getData());
241
242                 operationComplete(raftActor, operationContext, ServerChangeStatus.OK);
243             }
244         }
245     }
246
247     /**
248      * Abstract base class for an AddServer operation state.
249      */
250     private abstract class AddServerState extends AbstractOperationState {
251         private final AddServerContext addServerContext;
252
253         AddServerState(AddServerContext addServerContext) {
254             this.addServerContext = addServerContext;
255         }
256
257         AddServerContext getAddServerContext() {
258             return addServerContext;
259         }
260     }
261
262     /**
263      * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
264      * snapshot capture, if necessary.
265      */
266     private class InitialAddServerState extends AddServerState implements InitialOperationState {
267         InitialAddServerState(AddServerContext addServerContext) {
268             super(addServerContext);
269         }
270
271         @Override
272         public void initiate(RaftActor raftActor) {
273             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
274
275             AddServer addServer = getAddServerContext().getOperation();
276
277             LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
278
279             raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress());
280
281             // if voting member - initialize to VOTING_NOT_INITIALIZED
282             FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
283                 FollowerState.NON_VOTING;
284             leader.addFollower(addServer.getNewServerId(), initialState);
285
286             if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
287                 LOG.debug("{}: Leader sending initiate capture snapshot to new follower {}", raftContext.getId(),
288                         addServer.getNewServerId());
289
290                 leader.initiateCaptureSnapshot(addServer.getNewServerId());
291
292                 // schedule the install snapshot timeout timer
293                 Cancellable installSnapshotTimer = raftContext.getActorSystem().scheduler().scheduleOnce(
294                         new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
295                                 TimeUnit.MILLISECONDS), raftContext.getActor(),
296                                 new FollowerCatchUpTimeout(addServer.getNewServerId()),
297                                 raftContext.getActorSystem().dispatcher(), raftContext.getActor());
298
299                 currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
300             } else {
301                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
302                         raftContext.getId());
303
304                 persistNewServerConfiguration(raftActor, getAddServerContext());
305             }
306         }
307     }
308
309     /**
310      * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
311      * reply or timeout.
312      */
313     private class InstallingSnapshot extends AddServerState {
314         private final Cancellable installSnapshotTimer;
315
316         InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
317             super(addServerContext);
318             this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
319         }
320
321         @Override
322         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
323             String serverId = followerTimeout.getNewServerId();
324
325             LOG.debug("{}: onFollowerCatchupTimeout: {}", raftContext.getId(), serverId);
326
327             AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
328
329             // cleanup
330             raftContext.removePeer(serverId);
331             leader.removeFollower(serverId);
332
333             LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), serverId);
334
335             operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.TIMEOUT);
336         }
337
338         @Override
339         public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
340             LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
341
342             String followerId = reply.getFollowerId();
343
344             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
345             // add server operation that timed out.
346             if(getAddServerContext().getOperation().getNewServerId().equals(followerId)) {
347                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
348                 FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
349
350                 installSnapshotTimer.cancel();
351
352                 followerLogInformation.setFollowerState(FollowerState.VOTING);
353                 leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
354
355                 persistNewServerConfiguration(raftActor, getAddServerContext());
356             }
357         }
358     }
359
360     /**
361      * Stores context information for a server operation.
362      *
363      * @param <T> the operation type
364      */
365     private static abstract class ServerOperationContext<T> {
366         private final T operation;
367         private final ActorRef clientRequestor;
368         private final String contextId;
369
370         ServerOperationContext(T operation, ActorRef clientRequestor){
371             this.operation = operation;
372             this.clientRequestor = clientRequestor;
373             contextId = UUID.randomUUID().toString();
374         }
375
376         String getContextId() {
377             return contextId;
378         }
379
380         T getOperation() {
381             return operation;
382         }
383
384         ActorRef getClientRequestor() {
385             return clientRequestor;
386         }
387
388         abstract Object newReply(ServerChangeStatus status, String leaderId);
389
390         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
391     }
392
393     /**
394      * Stores context information for an AddServer operation.
395      */
396     private static class AddServerContext extends ServerOperationContext<AddServer> {
397         AddServerContext(AddServer addServer, ActorRef clientRequestor) {
398             super(addServer, clientRequestor);
399         }
400
401         @Override
402         Object newReply(ServerChangeStatus status, String leaderId) {
403             return new AddServerReply(status, leaderId);
404         }
405
406         @Override
407         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
408             return support.new InitialAddServerState(this);
409         }
410     }
411 }