2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import java.util.ArrayList;
11 import java.util.LinkedList;
12 import java.util.List;
14 import java.util.Queue;
15 import java.util.concurrent.TimeUnit;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSelection;
18 import akka.actor.Cancellable;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
20 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
22 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
23 import org.opendaylight.controller.cluster.raft.messages.AddServer;
24 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
29 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
30 import scala.concurrent.duration.FiniteDuration;
33 * Handles server configuration related messages for a RaftActor.
35 * @author Thomas Pantelis
37 class RaftActorServerConfigurationSupport {
38 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
39 private final RaftActorContext context;
40 // client follower queue
41 private final Queue<CatchupFollowerInfo> followerInfoQueue = new LinkedList<CatchupFollowerInfo>();
43 private Cancellable followerTimeout = null;
45 RaftActorServerConfigurationSupport(RaftActorContext context) {
46 this.context = context;
49 boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
50 if(message instanceof AddServer) {
51 onAddServer((AddServer)message, raftActor, sender);
53 } else if (message instanceof FollowerCatchUpTimeout){
54 FollowerCatchUpTimeout followerTimeout = (FollowerCatchUpTimeout)message;
55 // abort follower catchup on timeout
56 onFollowerCatchupTimeout(raftActor, sender, followerTimeout.getNewServerId());
58 } else if (message instanceof UnInitializedFollowerSnapshotReply){
59 // snapshot installation is successful
60 onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
62 } else if(message instanceof ApplyState){
63 ApplyState applyState = (ApplyState) message;
64 Payload data = applyState.getReplicatedLogEntry().getData();
65 if( data instanceof ServerConfigurationPayload){
66 LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully",
67 (ServerConfigurationPayload)data);
68 // respond ok to follower
69 respondToClient(raftActor, ServerChangeStatus.OK);
78 private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
79 LOG.debug("onAddServer: {}", addServer);
80 if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
84 CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
85 boolean process = !followerInfoQueue.isEmpty();
86 followerInfoQueue.add(followerInfo);
88 processAddServer(raftActor);
92 private void processAddServer(RaftActor raftActor){
93 LOG.debug("In processAddServer");
94 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
95 AddServer addSrv = followerInfoQueue.peek().getAddServer();
96 context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
98 // if voting member - initialize to VOTING_NOT_INITIALIZED
99 FollowerState initialState = addSrv.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
100 FollowerState.NON_VOTING;
101 leader.addFollower(addSrv.getNewServerId(), initialState);
104 // if initialState == FollowerState.VOTING_NOT_INITIALIZED
105 // Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
106 // Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
107 // Set local instance state and wait for message from the AbstractLeader when install snapshot
108 // is done and return now
109 // When install snapshot message is received, go to step 1
113 // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and
114 // minIsolatedLeaderPeerCount
115 // 2) persist and replicate ServerConfigurationPayload via
116 // raftActor.persistData(sender, uuid, newServerConfigurationPayload)
117 // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor,
118 // on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
121 if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
122 LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
123 leader.initiateCaptureSnapshot(addSrv.getNewServerId());
124 // schedule the catchup timeout timer
125 followerTimeout = context.getActorSystem().scheduler()
126 .scheduleOnce(new FiniteDuration(((context.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
127 TimeUnit.MILLISECONDS),
128 context.getActor(), new FollowerCatchUpTimeout(addSrv.getNewServerId()),
129 context.getActorSystem().dispatcher(), context.getActor());
131 LOG.debug("Directly persisting the new server configuration : {}", addSrv.getNewServerId());
132 persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(),
133 addSrv.getNewServerId());
137 private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) {
138 if (raftActor.isLeader()) {
142 ActorSelection leader = raftActor.getLeader();
143 if (leader != null) {
144 LOG.debug("Not leader - forwarding to leader {}", leader);
145 leader.forward(message, raftActor.getContext());
147 LOG.debug("No leader - returning NO_LEADER AddServerReply");
148 sender.tell(new AddServerReply(ServerChangeStatus.NO_LEADER, null), raftActor.self());
154 private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
155 RaftActor raftActor, ActorRef sender){
157 String followerId = reply.getFollowerId();
158 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
159 FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
161 followerLogInformation.setFollowerState(FollowerState.VOTING);
162 leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
163 persistNewServerConfiguration(raftActor, sender, followerId);
166 private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){
167 /* get old server configuration list */
168 Map<String, String> tempMap = context.getPeerAddresses();
169 List<String> cOld = new ArrayList<String>();
170 for (Map.Entry<String, String> entry : tempMap.entrySet()) {
171 if(!entry.getKey().equals(followerId)){
172 cOld.add(entry.getKey());
175 LOG.debug("Cold server configuration : {}", cOld.toString());
176 /* get new server configuration list */
177 List <String> cNew = new ArrayList<String>(cOld);
178 cNew.add(followerId);
179 LOG.debug("Cnew server configuration : {}", cNew.toString());
180 // construct the peer list
181 ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew);
182 /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */
183 raftActor.persistData(sender, context.getId(), servPayload);
186 private void stopFollowerTimer() {
187 if (followerTimeout != null && !followerTimeout.isCancelled()) {
188 followerTimeout.cancel();
192 private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
194 LOG.debug("onFollowerCatchupTimeout: {}", serverId);
195 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
197 context.removePeer(serverId);
198 leader.removeFollower(serverId);
199 LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId);
200 respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
203 private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
205 int size = followerInfoQueue.size();
207 // remove the entry from the queue
208 CatchupFollowerInfo fInfo = followerInfoQueue.remove();
210 ActorRef toClient = fInfo.getClientRequestor();
212 toClient.tell(new AddServerReply(result, raftActor.getLeaderId()), raftActor.self());
213 LOG.debug("Response returned is {} for server {} ", result, fInfo.getAddServer().getNewServerId());
214 if(!followerInfoQueue.isEmpty()){
215 processAddServer(raftActor);
219 // mantain sender actorRef
220 private class CatchupFollowerInfo {
221 private final AddServer addServer;
222 private final ActorRef clientRequestor;
224 CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
226 clientRequestor = cliReq;
228 public AddServer getAddServer(){
231 public ActorRef getClientRequestor(){
232 return clientRequestor;