BUG-2187: Add Server - Leader Implementation
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorServerConfigurationSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import java.util.ArrayList;
11 import java.util.LinkedList;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.Queue;
15 import java.util.concurrent.TimeUnit;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSelection;
18 import akka.actor.Cancellable;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
20 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
22 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
23 import org.opendaylight.controller.cluster.raft.messages.AddServer;
24 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
29 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
30 import scala.concurrent.duration.FiniteDuration;
31
32 /**
33  * Handles server configuration related messages for a RaftActor.
34  *
35  * @author Thomas Pantelis
36  */
37 class RaftActorServerConfigurationSupport {
38     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
39     private final RaftActorContext context;
40     // client follower queue
41     private final Queue<CatchupFollowerInfo> followerInfoQueue = new LinkedList<CatchupFollowerInfo>();
42     // timeout handle
43     private Cancellable followerTimeout = null;
44
45     RaftActorServerConfigurationSupport(RaftActorContext context) {
46         this.context = context;
47     }
48
49     boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
50         if(message instanceof AddServer) {
51             onAddServer((AddServer)message, raftActor, sender);
52             return true;
53         } else if (message instanceof FollowerCatchUpTimeout){
54             FollowerCatchUpTimeout followerTimeout  = (FollowerCatchUpTimeout)message;
55             // abort follower catchup on timeout
56             onFollowerCatchupTimeout(raftActor, sender, followerTimeout.getNewServerId());
57             return true;
58         } else if (message instanceof UnInitializedFollowerSnapshotReply){
59             // snapshot installation is successful
60             onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
61             return true;
62         } else if(message instanceof ApplyState){
63             ApplyState applyState = (ApplyState) message;
64             Payload data = applyState.getReplicatedLogEntry().getData();
65             if( data instanceof ServerConfigurationPayload){
66                  LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully",
67                                                                                     (ServerConfigurationPayload)data);
68                  // respond ok to follower
69                  respondToClient(raftActor, ServerChangeStatus.OK);
70                  return true;
71             }
72             return false;
73         } else {
74             return false;
75         }
76     }
77
78     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
79         LOG.debug("onAddServer: {}", addServer);
80         if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
81             return;
82         }
83
84         CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
85         boolean process = !followerInfoQueue.isEmpty();
86         followerInfoQueue.add(followerInfo);
87         if(process) {
88             processAddServer(raftActor);
89         }
90     }
91
92     private void processAddServer(RaftActor raftActor){
93         LOG.debug("In processAddServer");
94         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
95         AddServer addSrv = followerInfoQueue.peek().getAddServer();
96         context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
97
98         // if voting member - initialize to VOTING_NOT_INITIALIZED
99         FollowerState initialState = addSrv.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
100             FollowerState.NON_VOTING;
101         leader.addFollower(addSrv.getNewServerId(), initialState);
102
103         // TODO
104         // if initialState == FollowerState.VOTING_NOT_INITIALIZED
105         //     Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
106         //     Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
107         //     Set local instance state and wait for message from the AbstractLeader when install snapshot
108         //     is done and return now
109         //     When install snapshot message is received, go to step 1
110         // else
111         //     go to step 2
112         //
113         // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and
114         //        minIsolatedLeaderPeerCount
115         // 2) persist and replicate ServerConfigurationPayload via
116         //           raftActor.persistData(sender, uuid, newServerConfigurationPayload)
117         // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor,
118         //       on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
119         //       this class.
120         //
121         if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
122             LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
123             leader.initiateCaptureSnapshot(addSrv.getNewServerId());
124             // schedule the catchup timeout timer
125             followerTimeout = context.getActorSystem().scheduler()
126                .scheduleOnce(new FiniteDuration(((context.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
127                 TimeUnit.MILLISECONDS),
128                 context.getActor(), new FollowerCatchUpTimeout(addSrv.getNewServerId()),
129                 context.getActorSystem().dispatcher(), context.getActor());
130         } else {
131             LOG.debug("Directly persisting  the new server configuration : {}", addSrv.getNewServerId());
132             persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(),
133                                                                                  addSrv.getNewServerId());
134         }
135     }
136
137     private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) {
138         if (raftActor.isLeader()) {
139             return false;
140         }
141
142         ActorSelection leader = raftActor.getLeader();
143         if (leader != null) {
144             LOG.debug("Not leader - forwarding to leader {}", leader);
145             leader.forward(message, raftActor.getContext());
146         } else {
147             LOG.debug("No leader - returning NO_LEADER AddServerReply");
148             sender.tell(new AddServerReply(ServerChangeStatus.NO_LEADER, null), raftActor.self());
149         }
150
151         return true;
152     }
153
154     private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
155                                                  RaftActor raftActor, ActorRef sender){
156
157         String followerId = reply.getFollowerId();
158         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
159         FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
160         stopFollowerTimer();
161         followerLogInformation.setFollowerState(FollowerState.VOTING);
162         leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
163         persistNewServerConfiguration(raftActor, sender, followerId);
164     }
165
166     private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){
167         /* get old server configuration list */
168         Map<String, String> tempMap =  context.getPeerAddresses();
169         List<String> cOld = new ArrayList<String>();
170         for (Map.Entry<String, String> entry : tempMap.entrySet()) {
171             if(!entry.getKey().equals(followerId)){
172                 cOld.add(entry.getKey());
173             }
174         }
175         LOG.debug("Cold server configuration : {}",  cOld.toString());
176         /* get new server configuration list */
177         List <String> cNew = new ArrayList<String>(cOld);
178         cNew.add(followerId);
179         LOG.debug("Cnew server configuration : {}",  cNew.toString());
180         // construct the peer list
181         ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew);
182         /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */
183         raftActor.persistData(sender, context.getId(), servPayload);
184    }
185
186    private void stopFollowerTimer() {
187         if (followerTimeout != null && !followerTimeout.isCancelled()) {
188             followerTimeout.cancel();
189         }
190    }
191
192    private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
193
194         LOG.debug("onFollowerCatchupTimeout: {}",  serverId);
195         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
196         // cleanup
197         context.removePeer(serverId);
198         leader.removeFollower(serverId);
199         LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId);
200         respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
201    }
202
203    private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
204
205         int size = followerInfoQueue.size();
206
207         // remove the entry from the queue
208         CatchupFollowerInfo fInfo = followerInfoQueue.remove();
209         // get the sender
210         ActorRef toClient = fInfo.getClientRequestor();
211
212         toClient.tell(new AddServerReply(result, raftActor.getLeaderId()), raftActor.self());
213         LOG.debug("Response returned is {} for server {} ",  result, fInfo.getAddServer().getNewServerId());
214         if(!followerInfoQueue.isEmpty()){
215             processAddServer(raftActor);
216         }
217    }
218
219     // mantain sender actorRef
220     private class CatchupFollowerInfo {
221         private final AddServer addServer;
222         private final ActorRef clientRequestor;
223
224         CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
225             addServer = addSrv;
226             clientRequestor = cliReq;
227         }
228         public AddServer getAddServer(){
229             return addServer;
230         }
231         public ActorRef getClientRequestor(){
232             return clientRequestor;
233         }
234     }
235 }