Bug 2187: AddServer unit test and bug fixes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Queue;
18 import java.util.UUID;
19 import java.util.concurrent.TimeUnit;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
22 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
23 import org.opendaylight.controller.cluster.raft.messages.AddServer;
24 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
26 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
27 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.FiniteDuration;
32
33 /**
34  * Handles server configuration related messages for a RaftActor.
35  *
36  * @author Thomas Pantelis
37  */
38 class RaftActorServerConfigurationSupport {
39     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
40     private final RaftActorContext context;
41     // client follower queue
42     private final Queue<CatchupFollowerInfo> followerInfoQueue = new LinkedList<CatchupFollowerInfo>();
43     // timeout handle
44     private Cancellable followerTimeout = null;
45
46     RaftActorServerConfigurationSupport(RaftActorContext context) {
47         this.context = context;
48     }
49
50     boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
51         if(message instanceof AddServer) {
52             onAddServer((AddServer)message, raftActor, sender);
53             return true;
54         } else if (message instanceof FollowerCatchUpTimeout){
55             FollowerCatchUpTimeout followerTimeout  = (FollowerCatchUpTimeout)message;
56             // abort follower catchup on timeout
57             onFollowerCatchupTimeout(raftActor, sender, followerTimeout.getNewServerId());
58             return true;
59         } else if (message instanceof UnInitializedFollowerSnapshotReply){
60             // snapshot installation is successful
61             onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
62             return true;
63         } else if(message instanceof ApplyState) {
64             return onApplyState((ApplyState) message, raftActor);
65         } else {
66             return false;
67         }
68     }
69
70     private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
71         Payload data = applyState.getReplicatedLogEntry().getData();
72         if(data instanceof ServerConfigurationPayload) {
73             CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
74             if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) {
75                 LOG.info("{} has been successfully replicated to a majority of followers", data);
76
77                 // respond ok to follower
78                 respondToClient(raftActor, ServerChangeStatus.OK);
79             }
80
81             return true;
82         }
83
84         return false;
85     }
86
87     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
88         LOG.debug("{}: onAddServer: {}", context.getId(), addServer);
89         if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
90             return;
91         }
92
93         CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
94         boolean process = followerInfoQueue.isEmpty();
95         followerInfoQueue.add(followerInfo);
96         if(process) {
97             processAddServer(raftActor);
98         }
99     }
100
101     /**
102      * The algorithm for AddServer is as follows:
103      * <ul>
104      * <li>Add the new server as a peer.</li>
105      * <li>Add the new follower to the leader.</li>
106      * <li>If new server should be voting member</li>
107      * <ul>
108      *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
109      *     <li>Initiate install snapshot to the new follower.</li>
110      *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
111      * </ul>
112      * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
113      * <li>On replication consensus, respond to caller with OK.</li>
114      * </ul>
115      * If the install snapshot times out after a period of 2 * election time out
116      * <ul>
117      *     <li>Remove the new server as a peer.</li>
118      *     <li>Remove the new follower from the leader.</li>
119      *     <li>Respond to caller with TIMEOUT.</li>
120      * </ul>
121      */
122     private void processAddServer(RaftActor raftActor){
123         LOG.debug("{}: In processAddServer", context.getId());
124
125         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
126         CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
127         AddServer addSrv = followerInfo.getAddServer();
128         context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
129
130         // if voting member - initialize to VOTING_NOT_INITIALIZED
131         FollowerState initialState = addSrv.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
132             FollowerState.NON_VOTING;
133         leader.addFollower(addSrv.getNewServerId(), initialState);
134
135         if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
136             LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
137             leader.initiateCaptureSnapshot(addSrv.getNewServerId());
138             // schedule the catchup timeout timer
139             followerTimeout = context.getActorSystem().scheduler()
140                .scheduleOnce(new FiniteDuration(((context.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
141                 TimeUnit.MILLISECONDS),
142                 context.getActor(), new FollowerCatchUpTimeout(addSrv.getNewServerId()),
143                 context.getActorSystem().dispatcher(), context.getActor());
144         } else {
145             LOG.debug("Directly persisting  the new server configuration : {}", addSrv.getNewServerId());
146             persistNewServerConfiguration(raftActor, followerInfo);
147         }
148     }
149
150     private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) {
151         if (raftActor.isLeader()) {
152             return false;
153         }
154
155         ActorSelection leader = raftActor.getLeader();
156         if (leader != null) {
157             LOG.debug("Not leader - forwarding to leader {}", leader);
158             leader.forward(message, raftActor.getContext());
159         } else {
160             LOG.debug("No leader - returning NO_LEADER AddServerReply");
161             sender.tell(new AddServerReply(ServerChangeStatus.NO_LEADER, null), raftActor.self());
162         }
163
164         return true;
165     }
166
167     private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
168                                                  RaftActor raftActor, ActorRef sender){
169         CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
170         // Sanity check - it's possible we get a reply after it timed out.
171         if(followerInfo == null) {
172             return;
173         }
174
175         String followerId = reply.getFollowerId();
176         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
177         FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
178         stopFollowerTimer();
179         followerLogInformation.setFollowerState(FollowerState.VOTING);
180         leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
181
182         persistNewServerConfiguration(raftActor, followerInfo);
183     }
184
185     private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){
186         List <String> cNew = new ArrayList<String>(context.getPeerAddresses().keySet());
187         cNew.add(context.getId());
188
189         LOG.debug("New server configuration : {}",  cNew.toString());
190
191         ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.<String>emptyList());
192
193         raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload);
194    }
195
196    private void stopFollowerTimer() {
197         if (followerTimeout != null && !followerTimeout.isCancelled()) {
198             followerTimeout.cancel();
199         }
200    }
201
202    private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
203         LOG.debug("onFollowerCatchupTimeout: {}",  serverId);
204         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
205         // cleanup
206         context.removePeer(serverId);
207         leader.removeFollower(serverId);
208         LOG.warn("Timeout occured for new server {} while installing snapshot", serverId);
209         respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
210    }
211
212    private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
213         // remove the entry from the queue
214         CatchupFollowerInfo fInfo = followerInfoQueue.remove();
215
216         // get the sender
217         ActorRef toClient = fInfo.getClientRequestor();
218
219         toClient.tell(new AddServerReply(result, raftActor.getLeaderId()), raftActor.self());
220         LOG.debug("Response returned is {} for server {} ",  result, fInfo.getAddServer().getNewServerId());
221         if(!followerInfoQueue.isEmpty()){
222             processAddServer(raftActor);
223         }
224    }
225
226     // maintain sender actorRef
227     private static class CatchupFollowerInfo {
228         private final AddServer addServer;
229         private final ActorRef clientRequestor;
230         private final String contextId;
231
232         CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
233             addServer = addSrv;
234             clientRequestor = cliReq;
235             contextId = UUID.randomUUID().toString();
236         }
237
238         String getContextId() {
239             return contextId;
240         }
241
242         AddServer getAddServer(){
243             return addServer;
244         }
245
246         ActorRef getClientRequestor(){
247             return clientRequestor;
248         }
249     }
250 }