2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.Queue;
18 import java.util.UUID;
19 import java.util.concurrent.TimeUnit;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
22 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
23 import org.opendaylight.controller.cluster.raft.messages.AddServer;
24 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
25 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
26 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
27 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.FiniteDuration;
34 * Handles server configuration related messages for a RaftActor.
36 * @author Thomas Pantelis
38 class RaftActorServerConfigurationSupport {
39 private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
40 private final RaftActorContext context;
41 // client follower queue
42 private final Queue<CatchupFollowerInfo> followerInfoQueue = new LinkedList<CatchupFollowerInfo>();
44 private Cancellable followerTimeout = null;
46 RaftActorServerConfigurationSupport(RaftActorContext context) {
47 this.context = context;
50 boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
51 if(message instanceof AddServer) {
52 onAddServer((AddServer)message, raftActor, sender);
54 } else if (message instanceof FollowerCatchUpTimeout){
55 FollowerCatchUpTimeout followerTimeout = (FollowerCatchUpTimeout)message;
56 // abort follower catchup on timeout
57 onFollowerCatchupTimeout(raftActor, sender, followerTimeout.getNewServerId());
59 } else if (message instanceof UnInitializedFollowerSnapshotReply){
60 // snapshot installation is successful
61 onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
63 } else if(message instanceof ApplyState) {
64 return onApplyState((ApplyState) message, raftActor);
70 private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
71 Payload data = applyState.getReplicatedLogEntry().getData();
72 if(data instanceof ServerConfigurationPayload) {
73 CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
74 if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) {
75 LOG.info("{} has been successfully replicated to a majority of followers", data);
77 // respond ok to follower
78 respondToClient(raftActor, ServerChangeStatus.OK);
87 private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
88 LOG.debug("{}: onAddServer: {}", context.getId(), addServer);
89 if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
93 CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
94 boolean process = followerInfoQueue.isEmpty();
95 followerInfoQueue.add(followerInfo);
97 processAddServer(raftActor);
102 * The algorithm for AddServer is as follows:
104 * <li>Add the new server as a peer.</li>
105 * <li>Add the new follower to the leader.</li>
106 * <li>If new server should be voting member</li>
108 * <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
109 * <li>Initiate install snapshot to the new follower.</li>
110 * <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
112 * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
113 * <li>On replication consensus, respond to caller with OK.</li>
115 * If the install snapshot times out after a period of 2 * election time out
117 * <li>Remove the new server as a peer.</li>
118 * <li>Remove the new follower from the leader.</li>
119 * <li>Respond to caller with TIMEOUT.</li>
122 private void processAddServer(RaftActor raftActor){
123 LOG.debug("{}: In processAddServer", context.getId());
125 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
126 CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
127 AddServer addSrv = followerInfo.getAddServer();
128 context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
130 // if voting member - initialize to VOTING_NOT_INITIALIZED
131 FollowerState initialState = addSrv.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
132 FollowerState.NON_VOTING;
133 leader.addFollower(addSrv.getNewServerId(), initialState);
135 if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
136 LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
137 leader.initiateCaptureSnapshot(addSrv.getNewServerId());
138 // schedule the catchup timeout timer
139 followerTimeout = context.getActorSystem().scheduler()
140 .scheduleOnce(new FiniteDuration(((context.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
141 TimeUnit.MILLISECONDS),
142 context.getActor(), new FollowerCatchUpTimeout(addSrv.getNewServerId()),
143 context.getActorSystem().dispatcher(), context.getActor());
145 LOG.debug("Directly persisting the new server configuration : {}", addSrv.getNewServerId());
146 persistNewServerConfiguration(raftActor, followerInfo);
150 private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) {
151 if (raftActor.isLeader()) {
155 ActorSelection leader = raftActor.getLeader();
156 if (leader != null) {
157 LOG.debug("Not leader - forwarding to leader {}", leader);
158 leader.forward(message, raftActor.getContext());
160 LOG.debug("No leader - returning NO_LEADER AddServerReply");
161 sender.tell(new AddServerReply(ServerChangeStatus.NO_LEADER, null), raftActor.self());
167 private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
168 RaftActor raftActor, ActorRef sender){
169 CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
170 // Sanity check - it's possible we get a reply after it timed out.
171 if(followerInfo == null) {
175 String followerId = reply.getFollowerId();
176 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
177 FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
179 followerLogInformation.setFollowerState(FollowerState.VOTING);
180 leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
182 persistNewServerConfiguration(raftActor, followerInfo);
185 private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){
186 List <String> cNew = new ArrayList<String>(context.getPeerAddresses().keySet());
187 cNew.add(context.getId());
189 LOG.debug("New server configuration : {}", cNew.toString());
191 ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.<String>emptyList());
193 raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload);
196 private void stopFollowerTimer() {
197 if (followerTimeout != null && !followerTimeout.isCancelled()) {
198 followerTimeout.cancel();
202 private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
203 LOG.debug("onFollowerCatchupTimeout: {}", serverId);
204 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
206 context.removePeer(serverId);
207 leader.removeFollower(serverId);
208 LOG.warn("Timeout occured for new server {} while installing snapshot", serverId);
209 respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
212 private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
213 // remove the entry from the queue
214 CatchupFollowerInfo fInfo = followerInfoQueue.remove();
217 ActorRef toClient = fInfo.getClientRequestor();
219 toClient.tell(new AddServerReply(result, raftActor.getLeaderId()), raftActor.self());
220 LOG.debug("Response returned is {} for server {} ", result, fInfo.getAddServer().getNewServerId());
221 if(!followerInfoQueue.isEmpty()){
222 processAddServer(raftActor);
226 // maintain sender actorRef
227 private static class CatchupFollowerInfo {
228 private final AddServer addServer;
229 private final ActorRef clientRequestor;
230 private final String contextId;
232 CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
234 clientRequestor = cliReq;
235 contextId = UUID.randomUUID().toString();
238 String getContextId() {
242 AddServer getAddServer(){
246 ActorRef getClientRequestor(){
247 return clientRequestor;